aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorTianon Gravi <admwiggin@gmail.com>2022-08-25 11:51:42 -0700
committerGitHub <noreply@github.com>2022-08-25 11:51:42 -0700
commit0ec426a57b9bcdd6cbd6fe98f7e15ebfc12d0e14 (patch)
tree85774a84f0199c1a6f5814bfd4b0a23ffbea595b
parent5f698d112134d27ffe9850bfd46708bd9fa35860 (diff)
parenta09f8dbe6eabd5bf1e9740bc09a721bfaeaa2583 (diff)
Merge pull request #43564 from corhere/libcontainerd-overhaul
Refactor libcontainerd to minimize containerd RPCs
-rw-r--r--api/swagger.yaml3
-rw-r--r--container/container.go50
-rw-r--r--container/exec.go (renamed from daemon/exec/exec.go)91
-rw-r--r--container/state.go40
-rw-r--r--container/state_test.go16
-rw-r--r--daemon/checkpoint.go9
-rw-r--r--daemon/daemon.go106
-rw-r--r--daemon/daemon_unix.go9
-rw-r--r--daemon/daemon_windows.go12
-rw-r--r--daemon/delete.go9
-rw-r--r--daemon/delete_test.go2
-rw-r--r--daemon/exec.go66
-rw-r--r--daemon/exec_linux.go13
-rw-r--r--daemon/exec_linux_test.go6
-rw-r--r--daemon/exec_windows.go7
-rw-r--r--daemon/health.go19
-rw-r--r--daemon/inspect.go12
-rw-r--r--daemon/inspect_linux.go3
-rw-r--r--daemon/inspect_test.go3
-rw-r--r--daemon/inspect_windows.go3
-rw-r--r--daemon/kill.go11
-rw-r--r--daemon/monitor.go74
-rw-r--r--daemon/pause.go9
-rw-r--r--daemon/resize.go13
-rw-r--r--daemon/resize_test.go74
-rw-r--r--daemon/start.go34
-rw-r--r--daemon/top_unix.go26
-rw-r--r--daemon/top_windows.go21
-rw-r--r--daemon/unpause.go6
-rw-r--r--daemon/update.go23
-rw-r--r--daemon/util_test.go74
-rw-r--r--integration/container/wait_test.go66
-rw-r--r--libcontainerd/local/local_windows.go803
-rw-r--r--libcontainerd/local/process_windows.go4
-rw-r--r--libcontainerd/remote/client.go479
-rw-r--r--libcontainerd/remote/client_linux.go9
-rw-r--r--libcontainerd/remote/client_windows.go2
-rw-r--r--libcontainerd/replace.go62
-rw-r--r--libcontainerd/types/types.go65
-rw-r--r--plugin/executor/containerd/containerd.go126
40 files changed, 1262 insertions, 1198 deletions
diff --git a/api/swagger.yaml b/api/swagger.yaml
index 181c80a418..622441b268 100644
--- a/api/swagger.yaml
+++ b/api/swagger.yaml
@@ -4650,7 +4650,8 @@ definitions:
example: false
OOMKilled:
description: |
- Whether this container has been killed because it ran out of memory.
+ Whether a process within this container has been killed because it ran
+ out of memory since the container was last started.
type: "boolean"
example: false
Dead:
diff --git a/container/container.go b/container/container.go
index d0f70e139e..09fab63511 100644
--- a/container/container.go
+++ b/container/container.go
@@ -19,7 +19,6 @@ import (
mounttypes "github.com/docker/docker/api/types/mount"
swarmtypes "github.com/docker/docker/api/types/swarm"
"github.com/docker/docker/container/stream"
- "github.com/docker/docker/daemon/exec"
"github.com/docker/docker/daemon/logger"
"github.com/docker/docker/daemon/logger/jsonfilelog"
"github.com/docker/docker/daemon/logger/local"
@@ -28,6 +27,7 @@ import (
"github.com/docker/docker/errdefs"
"github.com/docker/docker/image"
"github.com/docker/docker/layer"
+ libcontainerdtypes "github.com/docker/docker/libcontainerd/types"
"github.com/docker/docker/pkg/containerfs"
"github.com/docker/docker/pkg/idtools"
"github.com/docker/docker/pkg/ioutils"
@@ -53,9 +53,6 @@ type ExitStatus struct {
// The exit code with which the container exited.
ExitCode int
- // Whether the container encountered an OOM.
- OOMKilled bool
-
// Time at which the container died
ExitedAt time.Time
}
@@ -89,7 +86,7 @@ type Container struct {
HasBeenManuallyRestarted bool `json:"-"` // used to distinguish restart caused by restart policy from the manual one
MountPoints map[string]*volumemounts.MountPoint
HostConfig *containertypes.HostConfig `json:"-"` // do not serialize the host config in the json, otherwise we'll make the container unportable
- ExecCommands *exec.Store `json:"-"`
+ ExecCommands *ExecStore `json:"-"`
DependencyStore agentexec.DependencyGetter `json:"-"`
SecretReferences []*swarmtypes.SecretReference
ConfigReferences []*swarmtypes.ConfigReference
@@ -124,7 +121,7 @@ func NewBaseContainer(id, root string) *Container {
return &Container{
ID: id,
State: NewState(),
- ExecCommands: exec.NewStore(),
+ ExecCommands: NewExecStore(),
Root: root,
MountPoints: make(map[string]*volumemounts.MountPoint),
StreamConfig: stream.NewConfig(),
@@ -755,6 +752,47 @@ func (container *Container) CreateDaemonEnvironment(tty bool, linkedEnv []string
return env
}
+// RestoreTask restores the containerd container and task handles and reattaches
+// the IO for the running task. Container state is not synced with containerd's
+// state.
+//
+// An errdefs.NotFound error is returned if the container does not exist in
+// containerd. However, a nil error is returned if the task does not exist in
+// containerd.
+func (container *Container) RestoreTask(ctx context.Context, client libcontainerdtypes.Client) error {
+ container.Lock()
+ defer container.Unlock()
+ var err error
+ container.ctr, err = client.LoadContainer(ctx, container.ID)
+ if err != nil {
+ return err
+ }
+ container.task, err = container.ctr.AttachTask(ctx, container.InitializeStdio)
+ if err != nil && !errdefs.IsNotFound(err) {
+ return err
+ }
+ return nil
+}
+
+// GetRunningTask asserts that the container is running and returns the Task for
+// the container. An errdefs.Conflict error is returned if the container is not
+// in the Running state.
+//
+// A system error is returned if container is in a bad state: Running is true
+// but has a nil Task.
+//
+// The container lock must be held when calling this method.
+func (container *Container) GetRunningTask() (libcontainerdtypes.Task, error) {
+ if !container.Running {
+ return nil, errdefs.Conflict(fmt.Errorf("container %s is not running", container.ID))
+ }
+ tsk, ok := container.Task()
+ if !ok {
+ return nil, errdefs.System(errors.WithStack(fmt.Errorf("container %s is in Running state but has no containerd Task set", container.ID)))
+ }
+ return tsk, nil
+}
+
type rio struct {
cio.IO
diff --git a/daemon/exec/exec.go b/container/exec.go
index 2cf1833d7d..18e86c6a4f 100644
--- a/daemon/exec/exec.go
+++ b/container/exec.go
@@ -1,20 +1,20 @@
-package exec // import "github.com/docker/docker/daemon/exec"
+package container // import "github.com/docker/docker/container"
import (
- "context"
"runtime"
"sync"
"github.com/containerd/containerd/cio"
"github.com/docker/docker/container/stream"
+ "github.com/docker/docker/libcontainerd/types"
"github.com/docker/docker/pkg/stringid"
"github.com/sirupsen/logrus"
)
-// Config holds the configurations for execs. The Daemon keeps
+// ExecConfig holds the configurations for execs. The Daemon keeps
// track of both running and finished execs so that they can be
// examined both during and after completion.
-type Config struct {
+type ExecConfig struct {
sync.Mutex
Started chan struct{}
StreamConfig *stream.Config
@@ -25,7 +25,7 @@ type Config struct {
OpenStderr bool
OpenStdout bool
CanRemove bool
- ContainerID string
+ Container *Container
DetachKeys []byte
Entrypoint string
Args []string
@@ -34,39 +34,22 @@ type Config struct {
User string
WorkingDir string
Env []string
- Pid int
+ Process types.Process
ConsoleSize *[2]uint
}
-// NewConfig initializes the a new exec configuration
-func NewConfig() *Config {
- return &Config{
+// NewExecConfig initializes the a new exec configuration
+func NewExecConfig(c *Container) *ExecConfig {
+ return &ExecConfig{
ID: stringid.GenerateRandomID(),
+ Container: c,
StreamConfig: stream.NewConfig(),
Started: make(chan struct{}),
}
}
-type rio struct {
- cio.IO
-
- sc *stream.Config
-}
-
-func (i *rio) Close() error {
- i.IO.Close()
-
- return i.sc.CloseStreams()
-}
-
-func (i *rio) Wait() {
- i.sc.Wait(context.Background())
-
- i.IO.Wait()
-}
-
// InitializeStdio is called by libcontainerd to connect the stdio.
-func (c *Config) InitializeStdio(iop *cio.DirectIO) (cio.IO, error) {
+func (c *ExecConfig) InitializeStdio(iop *cio.DirectIO) (cio.IO, error) {
c.StreamConfig.CopyToPipe(iop)
if c.StreamConfig.Stdin() == nil && !c.Tty && runtime.GOOS == "windows" {
@@ -81,68 +64,68 @@ func (c *Config) InitializeStdio(iop *cio.DirectIO) (cio.IO, error) {
}
// CloseStreams closes the stdio streams for the exec
-func (c *Config) CloseStreams() error {
+func (c *ExecConfig) CloseStreams() error {
return c.StreamConfig.CloseStreams()
}
// SetExitCode sets the exec config's exit code
-func (c *Config) SetExitCode(code int) {
+func (c *ExecConfig) SetExitCode(code int) {
c.ExitCode = &code
}
-// Store keeps track of the exec configurations.
-type Store struct {
- byID map[string]*Config
- sync.RWMutex
+// ExecStore keeps track of the exec configurations.
+type ExecStore struct {
+ byID map[string]*ExecConfig
+ mu sync.RWMutex
}
-// NewStore initializes a new exec store.
-func NewStore() *Store {
- return &Store{
- byID: make(map[string]*Config),
+// NewExecStore initializes a new exec store.
+func NewExecStore() *ExecStore {
+ return &ExecStore{
+ byID: make(map[string]*ExecConfig),
}
}
// Commands returns the exec configurations in the store.
-func (e *Store) Commands() map[string]*Config {
- e.RLock()
- byID := make(map[string]*Config, len(e.byID))
+func (e *ExecStore) Commands() map[string]*ExecConfig {
+ e.mu.RLock()
+ byID := make(map[string]*ExecConfig, len(e.byID))
for id, config := range e.byID {
byID[id] = config
}
- e.RUnlock()
+ e.mu.RUnlock()
return byID
}
// Add adds a new exec configuration to the store.
-func (e *Store) Add(id string, Config *Config) {
- e.Lock()
+func (e *ExecStore) Add(id string, Config *ExecConfig) {
+ e.mu.Lock()
e.byID[id] = Config
- e.Unlock()
+ e.mu.Unlock()
}
// Get returns an exec configuration by its id.
-func (e *Store) Get(id string) *Config {
- e.RLock()
+func (e *ExecStore) Get(id string) *ExecConfig {
+ e.mu.RLock()
res := e.byID[id]
- e.RUnlock()
+ e.mu.RUnlock()
return res
}
// Delete removes an exec configuration from the store.
-func (e *Store) Delete(id string, pid int) {
- e.Lock()
+func (e *ExecStore) Delete(id string) {
+ e.mu.Lock()
delete(e.byID, id)
- e.Unlock()
+ e.mu.Unlock()
}
// List returns the list of exec ids in the store.
-func (e *Store) List() []string {
+func (e *ExecStore) List() []string {
var IDs []string
- e.RLock()
+ e.mu.RLock()
for id := range e.byID {
IDs = append(IDs, id)
}
- e.RUnlock()
+ e.mu.RUnlock()
return IDs
}
diff --git a/container/state.go b/container/state.go
index 1267c8694a..cdf88fe371 100644
--- a/container/state.go
+++ b/container/state.go
@@ -8,6 +8,7 @@ import (
"time"
"github.com/docker/docker/api/types"
+ libcontainerdtypes "github.com/docker/docker/libcontainerd/types"
units "github.com/docker/go-units"
)
@@ -36,6 +37,14 @@ type State struct {
stopWaiters []chan<- StateStatus
removeOnlyWaiters []chan<- StateStatus
+
+ // The libcontainerd reference fields are unexported to force consumers
+ // to access them through the getter methods with multi-valued returns
+ // so that they can't forget to nil-check: the code won't compile unless
+ // the nil-check result is explicitly consumed or discarded.
+
+ ctr libcontainerdtypes.Container
+ task libcontainerdtypes.Task
}
// StateStatus is used to return container wait results.
@@ -260,7 +269,7 @@ func (s *State) SetExitCode(ec int) {
}
// SetRunning sets the state of the container to "running".
-func (s *State) SetRunning(pid int, initial bool) {
+func (s *State) SetRunning(ctr libcontainerdtypes.Container, tsk libcontainerdtypes.Task, initial bool) {
s.ErrorMsg = ""
s.Paused = false
s.Running = true
@@ -269,7 +278,14 @@ func (s *State) SetRunning(pid int, initial bool) {
s.Paused = false
}
s.ExitCodeValue = 0
- s.Pid = pid
+ s.ctr = ctr
+ s.task = tsk
+ if tsk != nil {
+ s.Pid = int(tsk.Pid())
+ } else {
+ s.Pid = 0
+ }
+ s.OOMKilled = false
if initial {
s.StartedAt = time.Now().UTC()
}
@@ -287,7 +303,6 @@ func (s *State) SetStopped(exitStatus *ExitStatus) {
s.FinishedAt = exitStatus.ExitedAt
}
s.ExitCodeValue = exitStatus.ExitCode
- s.OOMKilled = exitStatus.OOMKilled
s.notifyAndClear(&s.stopWaiters)
}
@@ -303,7 +318,6 @@ func (s *State) SetRestarting(exitStatus *ExitStatus) {
s.Pid = 0
s.FinishedAt = time.Now().UTC()
s.ExitCodeValue = exitStatus.ExitCode
- s.OOMKilled = exitStatus.OOMKilled
s.notifyAndClear(&s.stopWaiters)
}
@@ -405,3 +419,21 @@ func (s *State) notifyAndClear(waiters *[]chan<- StateStatus) {
}
*waiters = nil
}
+
+// C8dContainer returns a reference to the libcontainerd Container object for
+// the container and whether the reference is valid.
+//
+// The container lock must be held when calling this method.
+func (s *State) C8dContainer() (_ libcontainerdtypes.Container, ok bool) {
+ return s.ctr, s.ctr != nil
+}
+
+// Task returns a reference to the libcontainerd Task object for the container
+// and whether the reference is valid.
+//
+// The container lock must be held when calling this method.
+//
+// See also: (*Container).GetRunningTask().
+func (s *State) Task() (_ libcontainerdtypes.Task, ok bool) {
+ return s.task, s.task != nil
+}
diff --git a/container/state_test.go b/container/state_test.go
index 09dfb56089..f4f22f70d6 100644
--- a/container/state_test.go
+++ b/container/state_test.go
@@ -6,6 +6,7 @@ import (
"time"
"github.com/docker/docker/api/types"
+ libcontainerdtypes "github.com/docker/docker/libcontainerd/types"
)
func TestIsValidHealthString(t *testing.T) {
@@ -28,6 +29,13 @@ func TestIsValidHealthString(t *testing.T) {
}
}
+type mockTask struct {
+ libcontainerdtypes.Task
+ pid uint32
+}
+
+func (t *mockTask) Pid() uint32 { return t.pid }
+
func TestStateRunStop(t *testing.T) {
s := NewState()
@@ -60,7 +68,7 @@ func TestStateRunStop(t *testing.T) {
// Set the state to "Running".
s.Lock()
- s.SetRunning(i, true)
+ s.SetRunning(nil, &mockTask{pid: uint32(i)}, true)
s.Unlock()
// Assert desired state.
@@ -125,7 +133,7 @@ func TestStateTimeoutWait(t *testing.T) {
s := NewState()
s.Lock()
- s.SetRunning(0, true)
+ s.SetRunning(nil, nil, true)
s.Unlock()
// Start a wait with a timeout.
@@ -174,7 +182,7 @@ func TestCorrectStateWaitResultAfterRestart(t *testing.T) {
s := NewState()
s.Lock()
- s.SetRunning(0, true)
+ s.SetRunning(nil, nil, true)
s.Unlock()
waitC := s.Wait(context.Background(), WaitConditionNotRunning)
@@ -185,7 +193,7 @@ func TestCorrectStateWaitResultAfterRestart(t *testing.T) {
s.Unlock()
s.Lock()
- s.SetRunning(0, true)
+ s.SetRunning(nil, nil, true)
s.Unlock()
got := <-waitC
diff --git a/daemon/checkpoint.go b/daemon/checkpoint.go
index 5cbe8574ab..97acc1d897 100644
--- a/daemon/checkpoint.go
+++ b/daemon/checkpoint.go
@@ -57,8 +57,11 @@ func (daemon *Daemon) CheckpointCreate(name string, config types.CheckpointCreat
return err
}
- if !container.IsRunning() {
- return fmt.Errorf("Container %s not running", name)
+ container.Lock()
+ tsk, err := container.GetRunningTask()
+ container.Unlock()
+ if err != nil {
+ return err
}
if !validCheckpointNamePattern.MatchString(config.CheckpointID) {
@@ -70,7 +73,7 @@ func (daemon *Daemon) CheckpointCreate(name string, config types.CheckpointCreat
return fmt.Errorf("cannot checkpoint container %s: %s", name, err)
}
- err = daemon.containerd.CreateCheckpoint(context.Background(), container.ID, checkpointDir, config.Exit)
+ err = tsk.CreateCheckpoint(context.Background(), checkpointDir, config.Exit)
if err != nil {
os.RemoveAll(checkpointDir)
return fmt.Errorf("Cannot checkpoint container %s: %s", name, err)
diff --git a/daemon/daemon.go b/daemon/daemon.go
index dfacd217e5..f0fa951bf6 100644
--- a/daemon/daemon.go
+++ b/daemon/daemon.go
@@ -30,7 +30,6 @@ import (
"github.com/docker/docker/daemon/config"
ctrd "github.com/docker/docker/daemon/containerd"
"github.com/docker/docker/daemon/events"
- "github.com/docker/docker/daemon/exec"
_ "github.com/docker/docker/daemon/graphdriver/register" // register graph drivers
"github.com/docker/docker/daemon/images"
"github.com/docker/docker/daemon/logger"
@@ -75,7 +74,7 @@ type Daemon struct {
repository string
containers container.Store
containersReplica container.ViewDB
- execCommands *exec.Store
+ execCommands *container.ExecStore
imageService ImageService
configStore *config.Config
statsCollector *stats.Collector
@@ -317,40 +316,43 @@ func (daemon *Daemon) restore() error {
logger(c).Debug("restoring container")
- var (
- err error
- alive bool
- ec uint32
- exitedAt time.Time
- process libcontainerdtypes.Process
- )
+ var es *containerd.ExitStatus
- alive, _, process, err = daemon.containerd.Restore(context.Background(), c.ID, c.InitializeStdio)
- if err != nil && !errdefs.IsNotFound(err) {
+ if err := c.RestoreTask(context.Background(), daemon.containerd); err != nil && !errdefs.IsNotFound(err) {
logger(c).WithError(err).Error("failed to restore container with containerd")
return
}
- logger(c).Debugf("alive: %v", alive)
- if !alive {
- // If process is not nil, cleanup dead container from containerd.
- // If process is nil then the above `containerd.Restore` returned an errdefs.NotFoundError,
- // and docker's view of the container state will be updated accorrdingly via SetStopped further down.
- if process != nil {
- logger(c).Debug("cleaning up dead container process")
- ec, exitedAt, err = process.Delete(context.Background())
- if err != nil && !errdefs.IsNotFound(err) {
- logger(c).WithError(err).Error("failed to delete container from containerd")
- return
+
+ alive := false
+ status := containerd.Unknown
+ if tsk, ok := c.Task(); ok {
+ s, err := tsk.Status(context.Background())
+ if err != nil {
+ logger(c).WithError(err).Error("failed to get task status")
+ } else {
+ status = s.Status
+ alive = status != containerd.Stopped
+ if !alive {
+ logger(c).Debug("cleaning up dead container process")
+ es, err = tsk.Delete(context.Background())
+ if err != nil && !errdefs.IsNotFound(err) {
+ logger(c).WithError(err).Error("failed to delete task from containerd")
+ return
+ }
+ } else if !daemon.configStore.LiveRestoreEnabled {
+ logger(c).Debug("shutting down container considered alive by containerd")
+ if err := daemon.shutdownContainer(c); err != nil && !errdefs.IsNotFound(err) {
+ log.WithError(err).Error("error shutting down container")
+ return
+ }
+ status = containerd.Stopped
+ alive = false
+ c.ResetRestartManager(false)
}
}
- } else if !daemon.configStore.LiveRestoreEnabled {
- logger(c).Debug("shutting down container considered alive by containerd")
- if err := daemon.shutdownContainer(c); err != nil && !errdefs.IsNotFound(err) {
- log.WithError(err).Error("error shutting down container")
- return
- }
- c.ResetRestartManager(false)
}
+ // If the containerd task for the container was not found, docker's view of the
+ // container state will be updated accordingly via SetStopped further down.
if c.IsRunning() || c.IsPaused() {
logger(c).Debug("syncing container on disk state with real state")
@@ -359,29 +361,22 @@ func (daemon *Daemon) restore() error {
switch {
case c.IsPaused() && alive:
- s, err := daemon.containerd.Status(context.Background(), c.ID)
- if err != nil {
- logger(c).WithError(err).Error("failed to get container status")
- } else {
- logger(c).WithField("state", s).Info("restored container paused")
- switch s {
- case containerd.Paused, containerd.Pausing:
- // nothing to do
- case containerd.Stopped:
- alive = false
- case containerd.Unknown:
- log.Error("unknown status for paused container during restore")
- default:
- // running
- c.Lock()
- c.Paused = false
- daemon.setStateCounter(c)
- daemon.updateHealthMonitor(c)
- if err := c.CheckpointTo(daemon.containersReplica); err != nil {
- log.WithError(err).Error("failed to update paused container state")
- }
- c.Unlock()
+ logger(c).WithField("state", status).Info("restored container paused")
+ switch status {
+ case containerd.Paused, containerd.Pausing:
+ // nothing to do
+ case containerd.Unknown, containerd.Stopped, "":
+ log.WithField("status", status).Error("unexpected status for paused container during restore")
+ default:
+ // running
+ c.Lock()
+ c.Paused = false
+ daemon.setStateCounter(c)
+ daemon.updateHealthMonitor(c)
+ if err := c.CheckpointTo(daemon.containersReplica); err != nil {
+ log.WithError(err).Error("failed to update paused container state")
}
+ c.Unlock()
}
case !c.IsPaused() && alive:
logger(c).Debug("restoring healthcheck")
@@ -393,7 +388,12 @@ func (daemon *Daemon) restore() error {
if !alive {
logger(c).Debug("setting stopped state")
c.Lock()
- c.SetStopped(&container.ExitStatus{ExitCode: int(ec), ExitedAt: exitedAt})
+ var ces container.ExitStatus
+ if es != nil {
+ ces.ExitCode = int(es.ExitCode())
+ ces.ExitedAt = es.ExitTime()
+ }
+ c.SetStopped(&ces)
daemon.Cleanup(c)
if err := c.CheckpointTo(daemon.containersReplica); err != nil {
log.WithError(err).Error("failed to update stopped container state")
@@ -956,7 +956,7 @@ func NewDaemon(ctx context.Context, config *config.Config, pluginStore *plugin.S
if d.containersReplica, err = container.NewViewDB(); err != nil {
return nil, err
}
- d.execCommands = exec.NewStore()
+ d.execCommands = container.NewExecStore()
d.statsCollector = d.newStatsCollector(1 * time.Second)
d.EventsService = events.New()
diff --git a/daemon/daemon_unix.go b/daemon/daemon_unix.go
index d0b71d1c65..acc2e85ba6 100644
--- a/daemon/daemon_unix.go
+++ b/daemon/daemon_unix.go
@@ -1387,10 +1387,13 @@ func copyBlkioEntry(entries []*statsV1.BlkIOEntry) []types.BlkioStatEntry {
}
func (daemon *Daemon) stats(c *container.Container) (*types.StatsJSON, error) {
- if !c.IsRunning() {
- return nil, errNotRunning(c.ID)
+ c.Lock()
+ task, err := c.GetRunningTask()
+ c.Unlock()
+ if err != nil {
+ return nil, err
}
- cs, err := daemon.containerd.Stats(context.Background(), c.ID)
+ cs, err := task.Stats(context.Background())
if err != nil {
if strings.Contains(err.Error(), "container not found") {
return nil, containerNotFound(c.ID)
diff --git a/daemon/daemon_windows.go b/daemon/daemon_windows.go
index 3dc8e09aeb..12f014c890 100644
--- a/daemon/daemon_windows.go
+++ b/daemon/daemon_windows.go
@@ -14,6 +14,7 @@ import (
containertypes "github.com/docker/docker/api/types/container"
"github.com/docker/docker/container"
"github.com/docker/docker/daemon/config"
+ "github.com/docker/docker/errdefs"
"github.com/docker/docker/libcontainerd/local"
"github.com/docker/docker/libcontainerd/remote"
"github.com/docker/docker/libnetwork"
@@ -515,14 +516,17 @@ func driverOptions(_ *config.Config) nwconfig.Option {
}
func (daemon *Daemon) stats(c *container.Container) (*types.StatsJSON, error) {
- if !c.IsRunning() {
- return nil, errNotRunning(c.ID)
+ c.Lock()
+ task, err := c.GetRunningTask()
+ c.Unlock()
+ if err != nil {
+ return nil, err
}
// Obtain the stats from HCS via libcontainerd
- stats, err := daemon.containerd.Stats(context.Background(), c.ID)
+ stats, err := task.Stats(context.Background())
if err != nil {
- if strings.Contains(err.Error(), "container not found") {
+ if errdefs.IsNotFound(err) {
return nil, containerNotFound(c.ID)
}
return nil, err
diff --git a/daemon/delete.go b/daemon/delete.go
index db04705bef..e10c668352 100644
--- a/daemon/delete.go
+++ b/daemon/delete.go
@@ -138,7 +138,14 @@ func (daemon *Daemon) cleanupContainer(container *container.Container, config ty
container.RWLayer = nil
}
- if err := containerfs.EnsureRemoveAll(container.Root); err != nil {
+ // Hold the container lock while deleting the container root directory
+ // so that other goroutines don't attempt to concurrently open files
+ // within it. Having any file open on Windows (without the
+ // FILE_SHARE_DELETE flag) will block it from being deleted.
+ container.Lock()
+ err := containerfs.EnsureRemoveAll(container.Root)
+ container.Unlock()
+ if err != nil {
err = errors.Wrapf(err, "unable to remove filesystem for %s", container.ID)
container.SetRemovalError(err)
return err
diff --git a/daemon/delete_test.go b/daemon/delete_test.go
index c95309e012..de7bbdc486 100644
--- a/daemon/delete_test.go
+++ b/daemon/delete_test.go
@@ -52,7 +52,7 @@ func TestContainerDelete(t *testing.T) {
fixMsg: "Stop the container before attempting removal or force remove",
initContainer: func() *container.Container {
c := newContainerWithState(container.NewState())
- c.SetRunning(0, true)
+ c.SetRunning(nil, nil, true)
c.SetRestarting(&container.ExitStatus{})
return c
}},
diff --git a/daemon/exec.go b/daemon/exec.go
index 4675ee4557..d4e5ab3df2 100644
--- a/daemon/exec.go
+++ b/daemon/exec.go
@@ -2,18 +2,19 @@ package daemon // import "github.com/docker/docker/daemon"
import (
"context"
+ "encoding/json"
"fmt"
"io"
"runtime"
"strings"
"time"
+ "github.com/containerd/containerd"
"github.com/docker/docker/api/types"
containertypes "github.com/docker/docker/api/types/container"
"github.com/docker/docker/api/types/strslice"
"github.com/docker/docker/container"
"github.com/docker/docker/container/stream"
- "github.com/docker/docker/daemon/exec"
"github.com/docker/docker/errdefs"
"github.com/docker/docker/pkg/pools"
"github.com/moby/sys/signal"
@@ -23,7 +24,7 @@ import (
"github.com/sirupsen/logrus"
)
-func (daemon *Daemon) registerExecCommand(container *container.Container, config *exec.Config) {
+func (daemon *Daemon) registerExecCommand(container *container.Container, config *container.ExecConfig) {
// Storing execs in container in order to kill them gracefully whenever the container is stopped or removed.
container.ExecCommands.Add(config.ID, config)
// Storing execs in daemon for easy access via Engine API.
@@ -41,7 +42,7 @@ func (daemon *Daemon) ExecExists(name string) (bool, error) {
// getExecConfig looks up the exec instance by name. If the container associated
// with the exec instance is stopped or paused, it will return an error.
-func (daemon *Daemon) getExecConfig(name string) (*exec.Config, error) {
+func (daemon *Daemon) getExecConfig(name string) (*container.ExecConfig, error) {
ec := daemon.execCommands.Get(name)
if ec == nil {
return nil, errExecNotFound(name)
@@ -52,7 +53,7 @@ func (daemon *Daemon) getExecConfig(name string) (*exec.Config, error) {
// saying the container isn't running, we should return a 404 so that
// the user sees the same error now that they will after the
// 5 minute clean-up loop is run which erases old/dead execs.
- ctr := daemon.containers.Get(ec.ContainerID)
+ ctr := daemon.containers.Get(ec.Container.ID)
if ctr == nil {
return nil, containerNotFound(name)
}
@@ -68,9 +69,9 @@ func (daemon *Daemon) getExecConfig(name string) (*exec.Config, error) {
return ec, nil
}
-func (daemon *Daemon) unregisterExecCommand(container *container.Container, execConfig *exec.Config) {
- container.ExecCommands.Delete(execConfig.ID, execConfig.Pid)
- daemon.execCommands.Delete(execConfig.ID, execConfig.Pid)
+func (daemon *Daemon) unregisterExecCommand(container *container.Container, execConfig *container.ExecConfig) {
+ container.ExecCommands.Delete(execConfig.ID)
+ daemon.execCommands.Delete(execConfig.ID)
}
func (daemon *Daemon) getActiveContainer(name string) (*container.Container, error) {
@@ -110,11 +111,10 @@ func (daemon *Daemon) ContainerExecCreate(name string, config *types.ExecConfig)
}
}
- execConfig := exec.NewConfig()
+ execConfig := container.NewExecConfig(cntr)
execConfig.OpenStdin = config.AttachStdin
execConfig.OpenStdout = config.AttachStdout
execConfig.OpenStderr = config.AttachStderr
- execConfig.ContainerID = cntr.ID
execConfig.DetachKeys = keys
execConfig.Entrypoint = entrypoint
execConfig.Args = args
@@ -174,27 +174,23 @@ func (daemon *Daemon) ContainerExecStart(ctx context.Context, name string, optio
ec.Running = true
ec.Unlock()
- c := daemon.containers.Get(ec.ContainerID)
- if c == nil {
- return containerNotFound(ec.ContainerID)
- }
- logrus.Debugf("starting exec command %s in container %s", ec.ID, c.ID)
+ logrus.Debugf("starting exec command %s in container %s", ec.ID, ec.Container.ID)
attributes := map[string]string{
"execID": ec.ID,
}
- daemon.LogContainerEventWithAttributes(c, "exec_start: "+ec.Entrypoint+" "+strings.Join(ec.Args, " "), attributes)
+ daemon.LogContainerEventWithAttributes(ec.Container, "exec_start: "+ec.Entrypoint+" "+strings.Join(ec.Args, " "), attributes)
defer func() {
if err != nil {
ec.Lock()
+ ec.Container.ExecCommands.Delete(ec.ID)
ec.Running = false
exitCode := 126
ec.ExitCode = &exitCode
if err := ec.CloseStreams(); err != nil {
- logrus.Errorf("failed to cleanup exec %s streams: %s", c.ID, err)
+ logrus.Errorf("failed to cleanup exec %s streams: %s", ec.Container.ID, err)
}
ec.Unlock()
- c.ExecCommands.Delete(ec.ID, ec.Pid)
}
}()
@@ -222,15 +218,18 @@ func (daemon *Daemon) ContainerExecStart(ctx context.Context, name string, optio
p := &specs.Process{}
if runtime.GOOS != "windows" {
- ctr, err := daemon.containerdCli.LoadContainer(ctx, ec.ContainerID)
+ ctr, err := daemon.containerdCli.LoadContainer(ctx, ec.Container.ID)
if err != nil {
return err
}
- spec, err := ctr.Spec(ctx)
+ md, err := ctr.Info(ctx, containerd.WithoutRefreshedMetadata)
if err != nil {
return err
}
- p = spec.Process
+ spec := specs.Spec{Process: p}
+ if err := json.Unmarshal(md.Spec.GetValue(), &spec); err != nil {
+ return err
+ }
}
p.Args = append([]string{ec.Entrypoint}, ec.Args...)
p.Env = ec.Env
@@ -253,7 +252,7 @@ func (daemon *Daemon) ContainerExecStart(ctx context.Context, name string, optio
p.Cwd = "/"
}
- if err := daemon.execSetPlatformOpt(c, ec, p); err != nil {
+ if err := daemon.execSetPlatformOpt(ctx, ec, p); err != nil {
return err
}
@@ -274,31 +273,34 @@ func (daemon *Daemon) ContainerExecStart(ctx context.Context, name string, optio
defer cancel()
attachErr := ec.StreamConfig.CopyStreams(copyCtx, &attachConfig)
+ ec.Container.Lock()
+ tsk, err := ec.Container.GetRunningTask()
+ ec.Container.Unlock()
+ if err != nil {
+ return err
+ }
+
// Synchronize with libcontainerd event loop
ec.Lock()
- c.ExecCommands.Lock()
- systemPid, err := daemon.containerd.Exec(ctx, c.ID, ec.ID, p, cStdin != nil, ec.InitializeStdio)
+ ec.Process, err = tsk.Exec(ctx, ec.ID, p, cStdin != nil, ec.InitializeStdio)
// the exec context should be ready, or error happened.
// close the chan to notify readiness
close(ec.Started)
if err != nil {
- c.ExecCommands.Unlock()
- ec.Unlock()
+ defer ec.Unlock()
return translateContainerdStartErr(ec.Entrypoint, ec.SetExitCode, err)
}
- ec.Pid = systemPid
- c.ExecCommands.Unlock()
ec.Unlock()
select {
case <-ctx.Done():
log := logrus.
- WithField("container", c.ID).
- WithField("exec", name)
+ WithField("container", ec.Container.ID).
+ WithField("exec", ec.ID)
log.Debug("Sending KILL signal to container process")
sigCtx, cancelFunc := context.WithTimeout(context.Background(), 30*time.Second)
defer cancelFunc()
- err := daemon.containerd.SignalProcess(sigCtx, c.ID, name, signal.SignalMap["KILL"])
+ err := ec.Process.Kill(sigCtx, signal.SignalMap["KILL"])
if err != nil {
log.WithError(err).Error("Could not send KILL signal to container process")
}
@@ -311,7 +313,7 @@ func (daemon *Daemon) ContainerExecStart(ctx context.Context, name string, optio
attributes := map[string]string{
"execID": ec.ID,
}
- daemon.LogContainerEventWithAttributes(c, "exec_detach", attributes)
+ daemon.LogContainerEventWithAttributes(ec.Container, "exec_detach", attributes)
}
}
return nil
@@ -328,7 +330,7 @@ func (daemon *Daemon) execCommandGC() {
for id, config := range daemon.execCommands.Commands() {
if config.CanRemove {
cleaned++
- daemon.execCommands.Delete(id, config.Pid)
+ daemon.execCommands.Delete(id)
} else {
if _, exists := liveExecCommands[id]; !exists {
config.CanRemove = true
diff --git a/daemon/exec_linux.go b/daemon/exec_linux.go
index d0090d6097..46ed4309ff 100644
--- a/daemon/exec_linux.go
+++ b/daemon/exec_linux.go
@@ -5,15 +5,14 @@ import (
"github.com/containerd/containerd/pkg/apparmor"
"github.com/docker/docker/container"
- "github.com/docker/docker/daemon/exec"
"github.com/docker/docker/oci/caps"
specs "github.com/opencontainers/runtime-spec/specs-go"
)
-func (daemon *Daemon) execSetPlatformOpt(c *container.Container, ec *exec.Config, p *specs.Process) error {
+func (daemon *Daemon) execSetPlatformOpt(ctx context.Context, ec *container.ExecConfig, p *specs.Process) error {
if len(ec.User) > 0 {
var err error
- p.User, err = getUser(c, ec.User)
+ p.User, err = getUser(ec.Container, ec.User)
if err != nil {
return err
}
@@ -27,9 +26,9 @@ func (daemon *Daemon) execSetPlatformOpt(c *container.Container, ec *exec.Config
}
if apparmor.HostSupports() {
var appArmorProfile string
- if c.AppArmorProfile != "" {
- appArmorProfile = c.AppArmorProfile
- } else if c.HostConfig.Privileged {
+ if ec.Container.AppArmorProfile != "" {
+ appArmorProfile = ec.Container.AppArmorProfile
+ } else if ec.Container.HostConfig.Privileged {
// `docker exec --privileged` does not currently disable AppArmor
// profiles. Privileged configuration of the container is inherited
appArmorProfile = unconfinedAppArmorProfile
@@ -51,5 +50,5 @@ func (daemon *Daemon) execSetPlatformOpt(c *container.Container, ec *exec.Config
p.ApparmorProfile = appArmorProfile
}
s := &specs.Spec{Process: p}
- return WithRlimits(daemon, c)(context.Background(), nil, nil, s)
+ return WithRlimits(daemon, ec.Container)(ctx, nil, nil, s)
}
diff --git a/daemon/exec_linux_test.go b/daemon/exec_linux_test.go
index ffef343898..17df7e16ad 100644
--- a/daemon/exec_linux_test.go
+++ b/daemon/exec_linux_test.go
@@ -4,13 +4,13 @@
package daemon
import (
+ "context"
"testing"
"github.com/containerd/containerd/pkg/apparmor"
containertypes "github.com/docker/docker/api/types/container"
"github.com/docker/docker/container"
"github.com/docker/docker/daemon/config"
- "github.com/docker/docker/daemon/exec"
specs "github.com/opencontainers/runtime-spec/specs-go"
"gotest.tools/v3/assert"
)
@@ -79,10 +79,10 @@ func TestExecSetPlatformOptAppArmor(t *testing.T) {
Privileged: tc.privileged,
},
}
- ec := &exec.Config{Privileged: execPrivileged}
+ ec := &container.ExecConfig{Container: c, Privileged: execPrivileged}
p := &specs.Process{}
- err := d.execSetPlatformOpt(c, ec, p)
+ err := d.execSetPlatformOpt(context.Background(), ec, p)
assert.NilError(t, err)
assert.Equal(t, p.ApparmorProfile, tc.expectedProfile)
})
diff --git a/daemon/exec_windows.go b/daemon/exec_windows.go
index 32f16e9282..a4a8696aed 100644
--- a/daemon/exec_windows.go
+++ b/daemon/exec_windows.go
@@ -1,13 +1,14 @@
package daemon // import "github.com/docker/docker/daemon"
import (
+ "context"
+
"github.com/docker/docker/container"
- "github.com/docker/docker/daemon/exec"
specs "github.com/opencontainers/runtime-spec/specs-go"
)
-func (daemon *Daemon) execSetPlatformOpt(c *container.Container, ec *exec.Config, p *specs.Process) error {
- if c.OS == "windows" {
+func (daemon *Daemon) execSetPlatformOpt(ctx context.Context, ec *container.ExecConfig, p *specs.Process) error {
+ if ec.Container.OS == "windows" {
p.User.Username = ec.User
}
return nil
diff --git a/daemon/health.go b/daemon/health.go
index 5f5779526b..2d94b3b1d9 100644
--- a/daemon/health.go
+++ b/daemon/health.go
@@ -13,7 +13,6 @@ import (
containertypes "github.com/docker/docker/api/types/container"
"github.com/docker/docker/api/types/strslice"
"github.com/docker/docker/container"
- "github.com/docker/docker/daemon/exec"
"github.com/sirupsen/logrus"
)
@@ -69,11 +68,10 @@ func (p *cmdProbe) run(ctx context.Context, d *Daemon, cntr *container.Container
cmdSlice = append(getShell(cntr), cmdSlice...)
}
entrypoint, args := d.getEntrypointAndArgs(strslice.StrSlice{}, cmdSlice)
- execConfig := exec.NewConfig()
+ execConfig := container.NewExecConfig(cntr)
execConfig.OpenStdin = false
execConfig.OpenStdout = true
execConfig.OpenStderr = true
- execConfig.ContainerID = cntr.ID
execConfig.DetachKeys = []byte{}
execConfig.Entrypoint = entrypoint
execConfig.Args = args
@@ -151,14 +149,23 @@ func (p *cmdProbe) run(ctx context.Context, d *Daemon, cntr *container.Container
if err != nil {
return nil, err
}
- if info.ExitCode == nil {
- return nil, fmt.Errorf("healthcheck for container %s has no exit code", cntr.ID)
+ exitCode, err := func() (int, error) {
+ info.Lock()
+ defer info.Unlock()
+ if info.ExitCode == nil {
+ info.Unlock()
+ return 0, fmt.Errorf("healthcheck for container %s has no exit code", cntr.ID)
+ }
+ return *info.ExitCode, nil
+ }()
+ if err != nil {
+ return nil, err
}
// Note: Go's json package will handle invalid UTF-8 for us
out := output.String()
return &types.HealthcheckResult{
End: time.Now(),
- ExitCode: *info.ExitCode,
+ ExitCode: exitCode,
Output: out,
}, nil
}
diff --git a/daemon/inspect.go b/daemon/inspect.go
index 3fc3de2806..75be2ee2b3 100644
--- a/daemon/inspect.go
+++ b/daemon/inspect.go
@@ -218,11 +218,17 @@ func (daemon *Daemon) ContainerExecInspect(id string) (*backend.ExecInspect, err
return nil, errExecNotFound(id)
}
- if ctr := daemon.containers.Get(e.ContainerID); ctr == nil {
+ if ctr := daemon.containers.Get(e.Container.ID); ctr == nil {
return nil, errExecNotFound(id)
}
+ e.Lock()
+ defer e.Unlock()
pc := inspectExecProcessConfig(e)
+ var pid int
+ if e.Process != nil {
+ pid = int(e.Process.Pid())
+ }
return &backend.ExecInspect{
ID: e.ID,
@@ -233,9 +239,9 @@ func (daemon *Daemon) ContainerExecInspect(id string) (*backend.ExecInspect, err
OpenStdout: e.OpenStdout,
OpenStderr: e.OpenStderr,
CanRemove: e.CanRemove,
- ContainerID: e.ContainerID,
+ ContainerID: e.Container.ID,
DetachKeys: e.DetachKeys,
- Pid: e.Pid,
+ Pid: pid,
}, nil
}
diff --git a/daemon/inspect_linux.go b/daemon/inspect_linux.go
index 049a7f743f..9c2c513d0e 100644
--- a/daemon/inspect_linux.go
+++ b/daemon/inspect_linux.go
@@ -5,7 +5,6 @@ import (
"github.com/docker/docker/api/types/backend"
"github.com/docker/docker/api/types/versions/v1p19"
"github.com/docker/docker/container"
- "github.com/docker/docker/daemon/exec"
)
// This sets platform-specific fields
@@ -62,7 +61,7 @@ func (daemon *Daemon) containerInspectPre120(name string) (*v1p19.ContainerJSON,
}, nil
}
-func inspectExecProcessConfig(e *exec.Config) *backend.ExecProcessConfig {
+func inspectExecProcessConfig(e *container.ExecConfig) *backend.ExecProcessConfig {
return &backend.ExecProcessConfig{
Tty: e.Tty,
Entrypoint: e.Entrypoint,
diff --git a/daemon/inspect_test.go b/daemon/inspect_test.go
index 07c026b723..e55af45bea 100644
--- a/daemon/inspect_test.go
+++ b/daemon/inspect_test.go
@@ -6,7 +6,6 @@ import (
containertypes "github.com/docker/docker/api/types/container"
"github.com/docker/docker/container"
"github.com/docker/docker/daemon/config"
- "github.com/docker/docker/daemon/exec"
"gotest.tools/v3/assert"
is "gotest.tools/v3/assert/cmp"
)
@@ -16,7 +15,7 @@ func TestGetInspectData(t *testing.T) {
ID: "inspect-me",
HostConfig: &containertypes.HostConfig{},
State: container.NewState(),
- ExecCommands: exec.NewStore(),
+ ExecCommands: container.NewExecStore(),
}
d := &Daemon{
diff --git a/daemon/inspect_windows.go b/daemon/inspect_windows.go
index 12fda670df..9b219d8b8c 100644
--- a/daemon/inspect_windows.go
+++ b/daemon/inspect_windows.go
@@ -4,7 +4,6 @@ import (
"github.com/docker/docker/api/types"
"github.com/docker/docker/api/types/backend"
"github.com/docker/docker/container"
- "github.com/docker/docker/daemon/exec"
)
// This sets platform-specific fields
@@ -17,7 +16,7 @@ func (daemon *Daemon) containerInspectPre120(name string) (*types.ContainerJSON,
return daemon.ContainerInspectCurrent(name, false)
}
-func inspectExecProcessConfig(e *exec.Config) *backend.ExecProcessConfig {
+func inspectExecProcessConfig(e *container.ExecConfig) *backend.ExecProcessConfig {
return &backend.ExecProcessConfig{
Tty: e.Tty,
Entrypoint: e.Entrypoint,
diff --git a/daemon/kill.go b/daemon/kill.go
index 383393e24f..953249c627 100644
--- a/daemon/kill.go
+++ b/daemon/kill.go
@@ -9,7 +9,6 @@ import (
containerpkg "github.com/docker/docker/container"
"github.com/docker/docker/errdefs"
- libcontainerdtypes "github.com/docker/docker/libcontainerd/types"
"github.com/moby/sys/signal"
"github.com/pkg/errors"
"github.com/sirupsen/logrus"
@@ -65,8 +64,9 @@ func (daemon *Daemon) killWithSignal(container *containerpkg.Container, stopSign
container.Lock()
defer container.Unlock()
- if !container.Running {
- return errNotRunning(container.ID)
+ task, err := container.GetRunningTask()
+ if err != nil {
+ return err
}
var unpause bool
@@ -96,8 +96,7 @@ func (daemon *Daemon) killWithSignal(container *containerpkg.Container, stopSign
return nil
}
- err := daemon.containerd.SignalProcess(context.Background(), container.ID, libcontainerdtypes.InitProcessName, stopSignal)
- if err != nil {
+ if err := task.Kill(context.Background(), stopSignal); err != nil {
if errdefs.IsNotFound(err) {
unpause = false
logrus.WithError(err).WithField("container", container.ID).WithField("action", "kill").Debug("container kill failed because of 'container not found' or 'no such process'")
@@ -121,7 +120,7 @@ func (daemon *Daemon) killWithSignal(container *containerpkg.Container, stopSign
if unpause {
// above kill signal will be sent once resume is finished
- if err := daemon.containerd.Resume(context.Background(), container.ID); err != nil {
+ if err := task.Resume(context.Background()); err != nil {
logrus.Warnf("Cannot unpause container %s: %s", container.ID, err)
}
}
diff --git a/daemon/monitor.go b/daemon/monitor.go
index 9a087283c8..1e96c1d9d8 100644
--- a/daemon/monitor.go
+++ b/daemon/monitor.go
@@ -7,6 +7,7 @@ import (
"github.com/docker/docker/api/types"
"github.com/docker/docker/container"
+ "github.com/docker/docker/errdefs"
libcontainerdtypes "github.com/docker/docker/libcontainerd/types"
"github.com/docker/docker/restartmanager"
"github.com/pkg/errors"
@@ -25,28 +26,32 @@ func (daemon *Daemon) setStateCounter(c *container.Container) {
}
func (daemon *Daemon) handleContainerExit(c *container.Container, e *libcontainerdtypes.EventInfo) error {
+ var exitStatus container.ExitStatus
c.Lock()
- ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second)
- ec, et, err := daemon.containerd.DeleteTask(ctx, c.ID)
- cancel()
- if err != nil {
- logrus.WithError(err).WithField("container", c.ID).Warnf("failed to delete container from containerd")
+ tsk, ok := c.Task()
+ if ok {
+ ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second)
+ es, err := tsk.Delete(ctx)
+ cancel()
+ if err != nil {
+ logrus.WithError(err).WithField("container", c.ID).Warnf("failed to delete container from containerd")
+ } else {
+ exitStatus = container.ExitStatus{
+ ExitCode: int(es.ExitCode()),
+ ExitedAt: es.ExitTime(),
+ }
+ }
}
- ctx, cancel = context.WithTimeout(context.Background(), 2*time.Second)
+ ctx, cancel := context.WithTimeout(context.Background(), 2*time.Second)
c.StreamConfig.Wait(ctx)
cancel()
c.Reset(false)
- exitStatus := container.ExitStatus{
- ExitCode: int(ec),
- ExitedAt: et,
- }
if e != nil {
exitStatus.ExitCode = int(e.ExitCode)
exitStatus.ExitedAt = e.ExitedAt
- exitStatus.OOMKilled = e.OOMKilled
if e.Error != nil {
c.SetError(e.Error)
}
@@ -54,7 +59,7 @@ func (daemon *Daemon) handleContainerExit(c *container.Container, e *libcontaine
daemonShutdown := daemon.IsShuttingDown()
execDuration := time.Since(c.StartedAt)
- restart, wait, err := c.RestartManager().ShouldRestart(ec, daemonShutdown || c.HasBeenManuallyStopped, execDuration)
+ restart, wait, err := c.RestartManager().ShouldRestart(uint32(exitStatus.ExitCode), daemonShutdown || c.HasBeenManuallyStopped, execDuration)
if err != nil {
logrus.WithError(err).
WithField("container", c.ID).
@@ -71,7 +76,7 @@ func (daemon *Daemon) handleContainerExit(c *container.Container, e *libcontaine
// restarted if/when the container is started again
daemon.stopHealthchecks(c)
attributes := map[string]string{
- "exitCode": strconv.Itoa(int(ec)),
+ "exitCode": strconv.Itoa(exitStatus.ExitCode),
}
daemon.Cleanup(c)
@@ -141,6 +146,7 @@ func (daemon *Daemon) ProcessEvent(id string, e libcontainerdtypes.EventType, ei
c.Lock()
defer c.Unlock()
+ c.OOMKilled = true
daemon.updateHealthMonitor(c)
if err := c.CheckpointTo(daemon.containersReplica); err != nil {
return err
@@ -157,6 +163,13 @@ func (daemon *Daemon) ProcessEvent(id string, e libcontainerdtypes.EventType, ei
ec := int(ei.ExitCode)
execConfig.Lock()
defer execConfig.Unlock()
+
+ // Remove the exec command from the container's store only and not the
+ // daemon's store so that the exec command can be inspected. Remove it
+ // before mutating execConfig to maintain the invariant that
+ // c.ExecCommands only contain execs in the Running state.
+ c.ExecCommands.Delete(execConfig.ID)
+
execConfig.ExitCode = &ec
execConfig.Running = false
@@ -168,11 +181,16 @@ func (daemon *Daemon) ProcessEvent(id string, e libcontainerdtypes.EventType, ei
logrus.Errorf("failed to cleanup exec %s streams: %s", c.ID, err)
}
- // remove the exec command from the container's store only and not the
- // daemon's store so that the exec command can be inspected.
- c.ExecCommands.Delete(execConfig.ID, execConfig.Pid)
-
exitCode = ec
+
+ go func() {
+ if _, err := execConfig.Process.Delete(context.Background()); err != nil {
+ logrus.WithError(err).WithFields(logrus.Fields{
+ "container": ei.ContainerID,
+ "process": ei.ProcessID,
+ }).Warn("failed to delete process")
+ }
+ }()
}
attributes := map[string]string{
"execID": ei.ProcessID,
@@ -185,7 +203,27 @@ func (daemon *Daemon) ProcessEvent(id string, e libcontainerdtypes.EventType, ei
// This is here to handle start not generated by docker
if !c.Running {
- c.SetRunning(int(ei.Pid), false)
+ ctr, err := daemon.containerd.LoadContainer(context.Background(), c.ID)
+ if err != nil {
+ if errdefs.IsNotFound(err) {
+ // The container was started by not-docker and so could have been deleted by
+ // not-docker before we got around to loading it from containerd.
+ logrus.WithField("container", c.ID).WithError(err).
+ Debug("could not load containerd container for start event")
+ return nil
+ }
+ return err
+ }
+ tsk, err := ctr.Task(context.Background())
+ if err != nil {
+ if errdefs.IsNotFound(err) {
+ logrus.WithField("container", c.ID).WithError(err).
+ Debug("failed to load task for externally-started container")
+ return nil
+ }
+ return err
+ }
+ c.SetRunning(ctr, tsk, false)
c.HasBeenManuallyStopped = false
c.HasBeenStartedBefore = true
daemon.setStateCounter(c)
diff --git a/daemon/pause.go b/daemon/pause.go
index 51004e6c15..976531e527 100644
--- a/daemon/pause.go
+++ b/daemon/pause.go
@@ -24,8 +24,9 @@ func (daemon *Daemon) containerPause(container *container.Container) error {
defer container.Unlock()
// We cannot Pause the container which is not running
- if !container.Running {
- return errNotRunning(container.ID)
+ tsk, err := container.GetRunningTask()
+ if err != nil {
+ return err
}
// We cannot Pause the container which is already paused
@@ -38,8 +39,8 @@ func (daemon *Daemon) containerPause(container *container.Container) error {
return errContainerIsRestarting(container.ID)
}
- if err := daemon.containerd.Pause(context.Background(), container.ID); err != nil {
- return fmt.Errorf("Cannot pause container %s: %s", container.ID, err)
+ if err := tsk.Pause(context.Background()); err != nil {
+ return fmt.Errorf("cannot pause container %s: %s", container.ID, err)
}
container.Paused = true
diff --git a/daemon/resize.go b/daemon/resize.go
index ac9395379b..2fd427ae9e 100644
--- a/daemon/resize.go
+++ b/daemon/resize.go
@@ -4,8 +4,6 @@ import (
"context"
"fmt"
"time"
-
- libcontainerdtypes "github.com/docker/docker/libcontainerd/types"
)
// ContainerResize changes the size of the TTY of the process running
@@ -16,11 +14,14 @@ func (daemon *Daemon) ContainerResize(name string, height, width int) error {
return err
}
- if !container.IsRunning() {
- return errNotRunning(container.ID)
+ container.Lock()
+ tsk, err := container.GetRunningTask()
+ container.Unlock()
+ if err != nil {
+ return err
}
- if err = daemon.containerd.ResizeTerminal(context.Background(), container.ID, libcontainerdtypes.InitProcessName, width, height); err == nil {
+ if err = tsk.Resize(context.Background(), uint32(width), uint32(height)); err == nil {
attributes := map[string]string{
"height": fmt.Sprintf("%d", height),
"width": fmt.Sprintf("%d", width),
@@ -46,7 +47,7 @@ func (daemon *Daemon) ContainerExecResize(name string, height, width int) error
select {
case <-ec.Started:
- return daemon.containerd.ResizeTerminal(context.Background(), ec.ContainerID, ec.ID, width, height)
+ return ec.Process.Resize(context.Background(), uint32(width), uint32(height))
case <-timeout.C:
return fmt.Errorf("timeout waiting for exec session ready")
}
diff --git a/daemon/resize_test.go b/daemon/resize_test.go
index 50a96778a7..b17e1fc3d0 100644
--- a/daemon/resize_test.go
+++ b/daemon/resize_test.go
@@ -8,7 +8,7 @@ import (
"testing"
"github.com/docker/docker/container"
- "github.com/docker/docker/daemon/exec"
+ "github.com/docker/docker/libcontainerd/types"
"gotest.tools/v3/assert"
)
@@ -16,32 +16,28 @@ import (
func TestExecResizeNoSuchExec(t *testing.T) {
n := "TestExecResize"
d := &Daemon{
- execCommands: exec.NewStore(),
+ execCommands: container.NewExecStore(),
}
c := &container.Container{
- ExecCommands: exec.NewStore(),
+ ExecCommands: container.NewExecStore(),
}
- ec := &exec.Config{
- ID: n,
+ ec := &container.ExecConfig{
+ ID: n,
+ Container: c,
}
d.registerExecCommand(c, ec)
err := d.ContainerExecResize("nil", 24, 8)
assert.ErrorContains(t, err, "No such exec instance")
}
-type execResizeMockContainerdClient struct {
- MockContainerdClient
- ProcessID string
- ContainerID string
- Width int
- Height int
+type execResizeMockProcess struct {
+ types.Process
+ Width, Height int
}
-func (c *execResizeMockContainerdClient) ResizeTerminal(ctx context.Context, containerID, processID string, width, height int) error {
- c.ProcessID = processID
- c.ContainerID = containerID
- c.Width = width
- c.Height = height
+func (p *execResizeMockProcess) Resize(ctx context.Context, width, height uint32) error {
+ p.Width = int(width)
+ p.Height = int(height)
return nil
}
@@ -50,30 +46,29 @@ func TestExecResize(t *testing.T) {
n := "TestExecResize"
width := 24
height := 8
- ec := &exec.Config{
- ID: n,
- ContainerID: n,
- Started: make(chan struct{}),
- }
- close(ec.Started)
- mc := &execResizeMockContainerdClient{}
+ mp := &execResizeMockProcess{}
d := &Daemon{
- execCommands: exec.NewStore(),
- containerd: mc,
+ execCommands: container.NewExecStore(),
containers: container.NewMemoryStore(),
}
c := &container.Container{
- ExecCommands: exec.NewStore(),
+ ID: n,
+ ExecCommands: container.NewExecStore(),
State: &container.State{Running: true},
}
+ ec := &container.ExecConfig{
+ ID: n,
+ Container: c,
+ Process: mp,
+ Started: make(chan struct{}),
+ }
+ close(ec.Started)
d.containers.Add(n, c)
d.registerExecCommand(c, ec)
err := d.ContainerExecResize(n, height, width)
assert.NilError(t, err)
- assert.Equal(t, mc.Width, width)
- assert.Equal(t, mc.Height, height)
- assert.Equal(t, mc.ProcessID, n)
- assert.Equal(t, mc.ContainerID, n)
+ assert.Equal(t, mp.Width, width)
+ assert.Equal(t, mp.Height, height)
}
// This test is to make sure that when exec context is not ready, a timeout error should happen.
@@ -82,21 +77,22 @@ func TestExecResizeTimeout(t *testing.T) {
n := "TestExecResize"
width := 24
height := 8
- ec := &exec.Config{
- ID: n,
- ContainerID: n,
- Started: make(chan struct{}),
- }
- mc := &execResizeMockContainerdClient{}
+ mp := &execResizeMockProcess{}
d := &Daemon{
- execCommands: exec.NewStore(),
- containerd: mc,
+ execCommands: container.NewExecStore(),
containers: container.NewMemoryStore(),
}
c := &container.Container{
- ExecCommands: exec.NewStore(),
+ ID: n,
+ ExecCommands: container.NewExecStore(),
State: &container.State{Running: true},
}
+ ec := &container.ExecConfig{
+ ID: n,
+ Container: c,
+ Process: mp,
+ Started: make(chan struct{}),
+ }
d.containers.Add(n, c)
d.registerExecCommand(c, ec)
err := d.ContainerExecResize(n, height, width)
diff --git a/daemon/start.go b/daemon/start.go
index ecc0f8b8af..bbdefb0173 100644
--- a/daemon/start.go
+++ b/daemon/start.go
@@ -9,6 +9,7 @@ import (
containertypes "github.com/docker/docker/api/types/container"
"github.com/docker/docker/container"
"github.com/docker/docker/errdefs"
+ "github.com/docker/docker/libcontainerd"
"github.com/pkg/errors"
"github.com/sirupsen/logrus"
)
@@ -178,28 +179,17 @@ func (daemon *Daemon) containerStart(container *container.Container, checkpoint
ctx := context.TODO()
- err = daemon.containerd.Create(ctx, container.ID, spec, shim, createOptions)
+ ctr, err := libcontainerd.ReplaceContainer(ctx, daemon.containerd, container.ID, spec, shim, createOptions)
if err != nil {
- if errdefs.IsConflict(err) {
- logrus.WithError(err).WithField("container", container.ID).Error("Container not cleaned up from containerd from previous run")
- // best effort to clean up old container object
- daemon.containerd.DeleteTask(ctx, container.ID)
- if err := daemon.containerd.Delete(ctx, container.ID); err != nil && !errdefs.IsNotFound(err) {
- logrus.WithError(err).WithField("container", container.ID).Error("Error cleaning up stale containerd container object")
- }
- err = daemon.containerd.Create(ctx, container.ID, spec, shim, createOptions)
- }
- if err != nil {
- return translateContainerdStartErr(container.Path, container.SetExitCode, err)
- }
+ return translateContainerdStartErr(container.Path, container.SetExitCode, err)
}
// TODO(mlaventure): we need to specify checkpoint options here
- pid, err := daemon.containerd.Start(context.Background(), container.ID, checkpointDir,
+ tsk, err := ctr.Start(ctx, checkpointDir,
container.StreamConfig.Stdin() != nil || container.Config.Tty,
container.InitializeStdio)
if err != nil {
- if err := daemon.containerd.Delete(context.Background(), container.ID); err != nil {
+ if err := ctr.Delete(context.Background()); err != nil {
logrus.WithError(err).WithField("container", container.ID).
Error("failed to delete failed start container")
}
@@ -207,7 +197,7 @@ func (daemon *Daemon) containerStart(container *container.Container, checkpoint
}
container.HasBeenManuallyRestarted = false
- container.SetRunning(pid, true)
+ container.SetRunning(ctr, tsk, true)
container.HasBeenStartedBefore = true
daemon.setStateCounter(container)
@@ -227,6 +217,14 @@ func (daemon *Daemon) containerStart(container *container.Container, checkpoint
// Cleanup releases any network resources allocated to the container along with any rules
// around how containers are linked together. It also unmounts the container's root filesystem.
func (daemon *Daemon) Cleanup(container *container.Container) {
+ // Microsoft HCS containers get in a bad state if host resources are
+ // released while the container still exists.
+ if ctr, ok := container.C8dContainer(); ok {
+ if err := ctr.Delete(context.Background()); err != nil {
+ logrus.Errorf("%s cleanup: failed to delete container from containerd: %v", container.ID, err)
+ }
+ }
+
daemon.releaseNetwork(container)
if err := container.UnmountIpcMount(); err != nil {
@@ -260,8 +258,4 @@ func (daemon *Daemon) Cleanup(container *container.Container) {
}
container.CancelAttachContext()
-
- if err := daemon.containerd.Delete(context.Background(), container.ID); err != nil {
- logrus.Errorf("%s cleanup: failed to delete container from containerd: %v", container.ID, err)
- }
}
diff --git a/daemon/top_unix.go b/daemon/top_unix.go
index 0287acaf7a..68da2596e4 100644
--- a/daemon/top_unix.go
+++ b/daemon/top_unix.go
@@ -14,6 +14,7 @@ import (
"github.com/docker/docker/api/types/container"
"github.com/docker/docker/errdefs"
+ libcontainerdtypes "github.com/docker/docker/libcontainerd/types"
"github.com/pkg/errors"
)
@@ -150,18 +151,31 @@ func (daemon *Daemon) ContainerTop(name string, psArgs string) (*container.Conta
return nil, err
}
- if !ctr.IsRunning() {
- return nil, errNotRunning(ctr.ID)
- }
+ tsk, err := func() (libcontainerdtypes.Task, error) {
+ ctr.Lock()
+ defer ctr.Unlock()
- if ctr.IsRestarting() {
- return nil, errContainerIsRestarting(ctr.ID)
+ tsk, err := ctr.GetRunningTask()
+ if err != nil {
+ return nil, err
+ }
+ if ctr.Restarting {
+ return nil, errContainerIsRestarting(ctr.ID)
+ }
+ return tsk, nil
+ }()
+ if err != nil {
+ return nil, err
}
- procs, err := daemon.containerd.ListPids(context.Background(), ctr.ID)
+ infos, err := tsk.Pids(context.Background())
if err != nil {
return nil, err
}
+ procs := make([]uint32, len(infos))
+ for i, p := range infos {
+ procs[i] = p.Pid
+ }
args := strings.Split(psArgs, " ")
pids := psPidsArg(procs)
diff --git a/daemon/top_windows.go b/daemon/top_windows.go
index eaaad4f771..203a5b7c62 100644
--- a/daemon/top_windows.go
+++ b/daemon/top_windows.go
@@ -7,6 +7,7 @@ import (
"time"
containertypes "github.com/docker/docker/api/types/container"
+ libcontainerdtypes "github.com/docker/docker/libcontainerd/types"
units "github.com/docker/go-units"
)
@@ -36,15 +37,21 @@ func (daemon *Daemon) ContainerTop(name string, psArgs string) (*containertypes.
return nil, err
}
- if !container.IsRunning() {
- return nil, errNotRunning(container.ID)
- }
+ task, err := func() (libcontainerdtypes.Task, error) {
+ container.Lock()
+ defer container.Unlock()
- if container.IsRestarting() {
- return nil, errContainerIsRestarting(container.ID)
- }
+ task, err := container.GetRunningTask()
+ if err != nil {
+ return nil, err
+ }
+ if container.Restarting {
+ return nil, errContainerIsRestarting(container.ID)
+ }
+ return task, nil
+ }()
- s, err := daemon.containerd.Summary(context.Background(), container.ID)
+ s, err := task.Summary(context.Background())
if err != nil {
return nil, err
}
diff --git a/daemon/unpause.go b/daemon/unpause.go
index fbcf7a589e..eb52256771 100644
--- a/daemon/unpause.go
+++ b/daemon/unpause.go
@@ -26,8 +26,12 @@ func (daemon *Daemon) containerUnpause(ctr *container.Container) error {
if !ctr.Paused {
return fmt.Errorf("Container %s is not paused", ctr.ID)
}
+ tsk, err := ctr.GetRunningTask()
+ if err != nil {
+ return err
+ }
- if err := daemon.containerd.Resume(context.Background(), ctr.ID); err != nil {
+ if err := tsk.Resume(context.Background()); err != nil {
return fmt.Errorf("Cannot unpause container %s: %s", ctr.ID, err)
}
diff --git a/daemon/update.go b/daemon/update.go
index bd8479fc05..f01635e49e 100644
--- a/daemon/update.go
+++ b/daemon/update.go
@@ -74,19 +74,28 @@ func (daemon *Daemon) update(name string, hostConfig *container.HostConfig) erro
ctr.UpdateMonitor(hostConfig.RestartPolicy)
}
+ defer daemon.LogContainerEvent(ctr, "update")
+
// If container is not running, update hostConfig struct is enough,
// resources will be updated when the container is started again.
// If container is running (including paused), we need to update configs
// to the real world.
- if ctr.IsRunning() && !ctr.IsRestarting() {
- if err := daemon.containerd.UpdateResources(context.Background(), ctr.ID, toContainerdResources(hostConfig.Resources)); err != nil {
- restoreConfig = true
- // TODO: it would be nice if containerd responded with better errors here so we can classify this better.
- return errCannotUpdate(ctr.ID, errdefs.System(err))
- }
+ ctr.Lock()
+ isRestarting := ctr.Restarting
+ tsk, err := ctr.GetRunningTask()
+ ctr.Unlock()
+ if errdefs.IsConflict(err) || isRestarting {
+ return nil
+ }
+ if err != nil {
+ return err
}
- daemon.LogContainerEvent(ctr, "update")
+ if err := tsk.UpdateResources(context.TODO(), toContainerdResources(hostConfig.Resources)); err != nil {
+ restoreConfig = true
+ // TODO: it would be nice if containerd responded with better errors here so we can classify this better.
+ return errCannotUpdate(ctr.ID, errdefs.System(err))
+ }
return nil
}
diff --git a/daemon/util_test.go b/daemon/util_test.go
deleted file mode 100644
index 5ac47fef3b..0000000000
--- a/daemon/util_test.go
+++ /dev/null
@@ -1,74 +0,0 @@
-//go:build linux
-// +build linux
-
-package daemon
-
-import (
- "context"
- "syscall"
- "time"
-
- "github.com/containerd/containerd"
- libcontainerdtypes "github.com/docker/docker/libcontainerd/types"
- specs "github.com/opencontainers/runtime-spec/specs-go"
-)
-
-type mockProcess struct {
-}
-
-func (m *mockProcess) Delete(_ context.Context) (uint32, time.Time, error) {
- return 0, time.Time{}, nil
-}
-
-// Mock containerd client implementation, for unit tests.
-type MockContainerdClient struct {
-}
-
-func (c *MockContainerdClient) Version(ctx context.Context) (containerd.Version, error) {
- return containerd.Version{}, nil
-}
-func (c *MockContainerdClient) Restore(ctx context.Context, containerID string, attachStdio libcontainerdtypes.StdioCallback) (alive bool, pid int, p libcontainerdtypes.Process, err error) {
- return false, 0, &mockProcess{}, nil
-}
-func (c *MockContainerdClient) Create(ctx context.Context, containerID string, spec *specs.Spec, shim string, runtimeOptions interface{}, opts ...containerd.NewContainerOpts) error {
- return nil
-}
-func (c *MockContainerdClient) Start(ctx context.Context, containerID, checkpointDir string, withStdin bool, attachStdio libcontainerdtypes.StdioCallback) (pid int, err error) {
- return 0, nil
-}
-func (c *MockContainerdClient) SignalProcess(ctx context.Context, containerID, processID string, signal syscall.Signal) error {
- return nil
-}
-func (c *MockContainerdClient) Exec(ctx context.Context, containerID, processID string, spec *specs.Process, withStdin bool, attachStdio libcontainerdtypes.StdioCallback) (int, error) {
- return 0, nil
-}
-func (c *MockContainerdClient) ResizeTerminal(ctx context.Context, containerID, processID string, width, height int) error {
- return nil
-}
-func (c *MockContainerdClient) CloseStdin(ctx context.Context, containerID, processID string) error {
- return nil
-}
-func (c *MockContainerdClient) Pause(ctx context.Context, containerID string) error { return nil }
-func (c *MockContainerdClient) Resume(ctx context.Context, containerID string) error { return nil }
-func (c *MockContainerdClient) Stats(ctx context.Context, containerID string) (*libcontainerdtypes.Stats, error) {
- return nil, nil
-}
-func (c *MockContainerdClient) ListPids(ctx context.Context, containerID string) ([]uint32, error) {
- return nil, nil
-}
-func (c *MockContainerdClient) Summary(ctx context.Context, containerID string) ([]libcontainerdtypes.Summary, error) {
- return nil, nil
-}
-func (c *MockContainerdClient) DeleteTask(ctx context.Context, containerID string) (uint32, time.Time, error) {
- return 0, time.Time{}, nil
-}
-func (c *MockContainerdClient) Delete(ctx context.Context, containerID string) error { return nil }
-func (c *MockContainerdClient) Status(ctx context.Context, containerID string) (containerd.ProcessStatus, error) {
- return "null", nil
-}
-func (c *MockContainerdClient) UpdateResources(ctx context.Context, containerID string, resources *libcontainerdtypes.Resources) error {
- return nil
-}
-func (c *MockContainerdClient) CreateCheckpoint(ctx context.Context, containerID, checkpointDir string, exit bool) error {
- return nil
-}
diff --git a/integration/container/wait_test.go b/integration/container/wait_test.go
index 12a9313caa..f8e64f0458 100644
--- a/integration/container/wait_test.go
+++ b/integration/container/wait_test.go
@@ -108,28 +108,25 @@ func TestWaitConditions(t *testing.T) {
cli := request.NewAPIClient(t)
testCases := []struct {
- doc string
- waitCond containertypes.WaitCondition
- expectedCode int64
+ doc string
+ waitCond containertypes.WaitCondition
+ runOpts []func(*container.TestContainerConfig)
}{
{
- doc: "default",
- expectedCode: 99,
+ doc: "default",
},
{
- doc: "not-running",
- expectedCode: 99,
- waitCond: containertypes.WaitConditionNotRunning,
+ doc: "not-running",
+ waitCond: containertypes.WaitConditionNotRunning,
},
{
- doc: "next-exit",
- expectedCode: 99,
- waitCond: containertypes.WaitConditionNextExit,
+ doc: "next-exit",
+ waitCond: containertypes.WaitConditionNextExit,
},
{
- doc: "removed",
- expectedCode: 99,
- waitCond: containertypes.WaitConditionRemoved,
+ doc: "removed",
+ waitCond: containertypes.WaitConditionRemoved,
+ runOpts: []func(*container.TestContainerConfig){container.WithAutoRemove},
},
}
@@ -138,21 +135,44 @@ func TestWaitConditions(t *testing.T) {
t.Run(tc.doc, func(t *testing.T) {
t.Parallel()
ctx := context.Background()
- opts := []func(*container.TestContainerConfig){
- container.WithCmd("sh", "-c", "sleep 1; exit 99"),
- }
- if tc.waitCond == containertypes.WaitConditionRemoved {
- opts = append(opts, container.WithAutoRemove)
- }
- containerID := container.Run(ctx, t, cli, opts...)
- poll.WaitOn(t, container.IsInState(ctx, cli, containerID, "running"), poll.WithTimeout(30*time.Second), poll.WithDelay(100*time.Millisecond))
+ opts := append([]func(*container.TestContainerConfig){
+ container.WithCmd("sh", "-c", "read -r; exit 99"),
+ func(tcc *container.TestContainerConfig) {
+ tcc.Config.AttachStdin = true
+ tcc.Config.OpenStdin = true
+ },
+ }, tc.runOpts...)
+ containerID := container.Create(ctx, t, cli, opts...)
+ t.Logf("ContainerID = %v", containerID)
+
+ streams, err := cli.ContainerAttach(ctx, containerID, types.ContainerAttachOptions{Stream: true, Stdin: true})
+ assert.NilError(t, err)
+ defer streams.Close()
+ assert.NilError(t, cli.ContainerStart(ctx, containerID, types.ContainerStartOptions{}))
waitResC, errC := cli.ContainerWait(ctx, containerID, tc.waitCond)
select {
case err := <-errC:
+ t.Fatalf("ContainerWait() err = %v", err)
+ case res := <-waitResC:
+ t.Fatalf("ContainerWait() sent exit code (%v) before ContainerStart()", res)
+ default:
+ }
+
+ info, _ := cli.ContainerInspect(ctx, containerID)
+ assert.Equal(t, "running", info.State.Status)
+
+ _, err = streams.Conn.Write([]byte("\n"))
+ assert.NilError(t, err)
+
+ select {
+ case err := <-errC:
assert.NilError(t, err)
case waitRes := <-waitResC:
- assert.Check(t, is.Equal(tc.expectedCode, waitRes.StatusCode))
+ assert.Check(t, is.Equal(int64(99), waitRes.StatusCode))
+ case <-time.After(15 * time.Second):
+ info, _ := cli.ContainerInspect(ctx, containerID)
+ t.Fatalf("Timed out waiting for container exit code (status = %q)", info.State.Status)
}
})
}
diff --git a/libcontainerd/local/local_windows.go b/libcontainerd/local/local_windows.go
index 5cd075d51e..da71805dbd 100644
--- a/libcontainerd/local/local_windows.go
+++ b/libcontainerd/local/local_windows.go
@@ -32,29 +32,44 @@ import (
)
type process struct {
- id string
- pid int
- hcsProcess hcsshim.Process
+ // mu guards the mutable fields of this struct.
+ //
+ // Always lock mu before ctr's mutex to prevent deadlocks.
+ mu sync.Mutex
+ id string // Invariants: immutable
+ ctr *container // Invariants: immutable, ctr != nil
+ hcsProcess hcsshim.Process // Is set to nil on process exit
+ exited *containerd.ExitStatus // Valid iff waitCh is closed
+ waitCh chan struct{}
+}
+
+type task struct {
+ process
}
type container struct {
- sync.Mutex
+ mu sync.Mutex
// The ociSpec is required, as client.Create() needs a spec, but can
// be called from the RestartManager context which does not otherwise
// have access to the Spec
+ //
+ // A container value with ociSpec == nil represents a container which
+ // has been loaded with (*client).LoadContainer, and is ineligible to
+ // be Start()ed.
ociSpec *specs.Spec
- hcsContainer hcsshim.Container
+ hcsContainer hcsshim.Container // Is set to nil on container delete
+ isPaused bool
+ client *client
id string
- status containerd.ProcessStatus
- exitedAt time.Time
- exitCode uint32
- waitCh chan struct{}
- init *process
- execs map[string]*process
terminateInvoked bool
+
+ // task is a reference to the current task for the container. As a
+ // corollary, when task == nil the container has no current task: the
+ // container was never Start()ed or the task was Delete()d.
+ task *task
}
// defaultOwner is a tag passed to HCS to allow it to differentiate between
@@ -63,22 +78,18 @@ type container struct {
const defaultOwner = "docker"
type client struct {
- sync.Mutex
-
- stateDir string
- backend libcontainerdtypes.Backend
- logger *logrus.Entry
- eventQ queue.Queue
- containers map[string]*container
+ stateDir string
+ backend libcontainerdtypes.Backend
+ logger *logrus.Entry
+ eventQ queue.Queue
}
// NewClient creates a new local executor for windows
func NewClient(ctx context.Context, cli *containerd.Client, stateDir, ns string, b libcontainerdtypes.Backend) (libcontainerdtypes.Client, error) {
c := &client{
- stateDir: stateDir,
- backend: b,
- logger: logrus.WithField("module", "libcontainerd").WithField("module", "libcontainerd").WithField("namespace", ns),
- containers: make(map[string]*container),
+ stateDir: stateDir,
+ backend: b,
+ logger: logrus.WithField("module", "libcontainerd").WithField("namespace", ns),
}
return c, nil
@@ -88,7 +99,7 @@ func (c *client) Version(ctx context.Context) (containerd.Version, error) {
return containerd.Version{}, errors.New("not implemented on Windows")
}
-// Create is the entrypoint to create a container from a spec.
+// NewContainer is the entrypoint to create a container from a spec.
// Table below shows the fields required for HCS JSON calling parameters,
// where if not populated, is omitted.
// +-----------------+--------------------------------------------+---------------------------------------------------+
@@ -139,16 +150,12 @@ func (c *client) Version(ctx context.Context) (containerd.Version, error) {
// "ImagePath": "C:\\\\control\\\\windowsfilter\\\\65bf96e5760a09edf1790cb229e2dfb2dbd0fcdc0bf7451bae099106bfbfea0c\\\\UtilityVM"
// },
// }
-func (c *client) Create(_ context.Context, id string, spec *specs.Spec, shim string, runtimeOptions interface{}, opts ...containerd.NewContainerOpts) error {
- if ctr := c.getContainer(id); ctr != nil {
- return errors.WithStack(errdefs.Conflict(errors.New("id already in use")))
- }
-
+func (c *client) NewContainer(_ context.Context, id string, spec *specs.Spec, shim string, runtimeOptions interface{}, opts ...containerd.NewContainerOpts) (libcontainerdtypes.Container, error) {
var err error
if spec.Linux != nil {
- return errors.New("linux containers are not supported on this platform")
+ return nil, errors.New("linux containers are not supported on this platform")
}
- err = c.createWindows(id, spec, runtimeOptions)
+ ctr, err := c.createWindows(id, spec, runtimeOptions)
if err == nil {
c.eventQ.Append(id, func() {
@@ -168,10 +175,10 @@ func (c *client) Create(_ context.Context, id string, spec *specs.Spec, shim str
}
})
}
- return err
+ return ctr, err
}
-func (c *client) createWindows(id string, spec *specs.Spec, runtimeOptions interface{}) error {
+func (c *client) createWindows(id string, spec *specs.Spec, runtimeOptions interface{}) (*container, error) {
logger := c.logger.WithField("container", id)
configuration := &hcsshim.ContainerConfig{
SystemType: "Container",
@@ -215,7 +222,7 @@ func (c *client) createWindows(id string, spec *specs.Spec, runtimeOptions inter
// We must have least two layers in the spec, the bottom one being a
// base image, the top one being the RW layer.
if spec.Windows.LayerFolders == nil || len(spec.Windows.LayerFolders) < 2 {
- return fmt.Errorf("OCI spec is invalid - at least two LayerFolders must be supplied to the runtime")
+ return nil, fmt.Errorf("OCI spec is invalid - at least two LayerFolders must be supplied to the runtime")
}
// Strip off the top-most layer as that's passed in separately to HCS
@@ -226,7 +233,7 @@ func (c *client) createWindows(id string, spec *specs.Spec, runtimeOptions inter
// We don't currently support setting the utility VM image explicitly.
// TODO circa RS5, this may be re-locatable.
if spec.Windows.HyperV.UtilityVMPath != "" {
- return errors.New("runtime does not support an explicit utility VM path for Hyper-V containers")
+ return nil, errors.New("runtime does not support an explicit utility VM path for Hyper-V containers")
}
// Find the upper-most utility VM image.
@@ -239,35 +246,35 @@ func (c *client) createWindows(id string, spec *specs.Spec, runtimeOptions inter
break
}
if !os.IsNotExist(err) {
- return err
+ return nil, err
}
}
if uvmImagePath == "" {
- return errors.New("utility VM image could not be found")
+ return nil, errors.New("utility VM image could not be found")
}
configuration.HvRuntime = &hcsshim.HvRuntime{ImagePath: uvmImagePath}
if spec.Root.Path != "" {
- return errors.New("OCI spec is invalid - Root.Path must be omitted for a Hyper-V container")
+ return nil, errors.New("OCI spec is invalid - Root.Path must be omitted for a Hyper-V container")
}
} else {
const volumeGUIDRegex = `^\\\\\?\\(Volume)\{{0,1}[0-9a-fA-F]{8}\-[0-9a-fA-F]{4}\-[0-9a-fA-F]{4}\-[0-9a-fA-F]{4}\-[0-9a-fA-F]{12}(\}){0,1}\}\\$`
if _, err := regexp.MatchString(volumeGUIDRegex, spec.Root.Path); err != nil {
- return fmt.Errorf(`OCI spec is invalid - Root.Path '%s' must be a volume GUID path in the format '\\?\Volume{GUID}\'`, spec.Root.Path)
+ return nil, fmt.Errorf(`OCI spec is invalid - Root.Path '%s' must be a volume GUID path in the format '\\?\Volume{GUID}\'`, spec.Root.Path)
}
// HCS API requires the trailing backslash to be removed
configuration.VolumePath = spec.Root.Path[:len(spec.Root.Path)-1]
}
if spec.Root.Readonly {
- return errors.New(`OCI spec is invalid - Root.Readonly must not be set on Windows`)
+ return nil, errors.New(`OCI spec is invalid - Root.Readonly must not be set on Windows`)
}
for _, layerPath := range layerFolders {
_, filename := filepath.Split(layerPath)
g, err := hcsshim.NameToGuid(filename)
if err != nil {
- return err
+ return nil, err
}
configuration.Layers = append(configuration.Layers, hcsshim.Layer{
ID: g.ToString(),
@@ -281,7 +288,7 @@ func (c *client) createWindows(id string, spec *specs.Spec, runtimeOptions inter
for _, mount := range spec.Mounts {
const pipePrefix = `\\.\pipe\`
if mount.Type != "" {
- return fmt.Errorf("OCI spec is invalid - Mount.Type '%s' must not be set", mount.Type)
+ return nil, fmt.Errorf("OCI spec is invalid - Mount.Type '%s' must not be set", mount.Type)
}
if strings.HasPrefix(mount.Destination, pipePrefix) {
mp := hcsshim.MappedPipe{
@@ -309,13 +316,13 @@ func (c *client) createWindows(id string, spec *specs.Spec, runtimeOptions inter
if len(spec.Windows.Devices) > 0 {
// Add any device assignments
if configuration.HvPartition {
- return errors.New("device assignment is not supported for HyperV containers")
+ return nil, errors.New("device assignment is not supported for HyperV containers")
}
for _, d := range spec.Windows.Devices {
// Per https://github.com/microsoft/hcsshim/blob/v0.9.2/internal/uvm/virtual_device.go#L17-L18,
// these represent an Interface Class GUID.
if d.IDType != "class" && d.IDType != "vpci-class-guid" {
- return errors.Errorf("device assignment of type '%s' is not supported", d.IDType)
+ return nil, errors.Errorf("device assignment of type '%s' is not supported", d.IDType)
}
configuration.AssignedDevices = append(configuration.AssignedDevices, hcsshim.AssignedDevice{InterfaceClassGUID: d.ID})
}
@@ -323,38 +330,32 @@ func (c *client) createWindows(id string, spec *specs.Spec, runtimeOptions inter
hcsContainer, err := hcsshim.CreateContainer(id, configuration)
if err != nil {
- return err
+ return nil, err
}
// Construct a container object for calling start on it.
ctr := &container{
+ client: c,
id: id,
- execs: make(map[string]*process),
ociSpec: spec,
hcsContainer: hcsContainer,
- status: containerd.Created,
- waitCh: make(chan struct{}),
}
logger.Debug("starting container")
- if err = hcsContainer.Start(); err != nil {
- c.logger.WithError(err).Error("failed to start container")
- ctr.Lock()
- if err := c.terminateContainer(ctr); err != nil {
- c.logger.WithError(err).Error("failed to cleanup after a failed Start")
+ if err := ctr.hcsContainer.Start(); err != nil {
+ logger.WithError(err).Error("failed to start container")
+ ctr.mu.Lock()
+ if err := ctr.terminateContainer(); err != nil {
+ logger.WithError(err).Error("failed to cleanup after a failed Start")
} else {
- c.logger.Debug("cleaned up after failed Start by calling Terminate")
+ logger.Debug("cleaned up after failed Start by calling Terminate")
}
- ctr.Unlock()
- return err
+ ctr.mu.Unlock()
+ return nil, err
}
- c.Lock()
- c.containers[id] = ctr
- c.Unlock()
-
logger.Debug("createWindows() completed successfully")
- return nil
+ return ctr, nil
}
@@ -388,16 +389,18 @@ func (c *client) extractResourcesFromSpec(spec *specs.Spec, configuration *hcssh
}
}
-func (c *client) Start(_ context.Context, id, _ string, withStdin bool, attachStdio libcontainerdtypes.StdioCallback) (int, error) {
- ctr := c.getContainer(id)
+func (ctr *container) Start(_ context.Context, _ string, withStdin bool, attachStdio libcontainerdtypes.StdioCallback) (libcontainerdtypes.Task, error) {
+ ctr.mu.Lock()
+ defer ctr.mu.Unlock()
+
switch {
- case ctr == nil:
- return -1, errors.WithStack(errdefs.NotFound(errors.New("no such container")))
- case ctr.init != nil:
- return -1, errors.WithStack(errdefs.NotModified(errors.New("container already started")))
+ case ctr.ociSpec == nil:
+ return nil, errors.WithStack(errdefs.NotImplemented(errors.New("a restored container cannot be started")))
+ case ctr.task != nil:
+ return nil, errors.WithStack(errdefs.NotModified(containerderrdefs.ErrAlreadyExists))
}
- logger := c.logger.WithField("container", id)
+ logger := ctr.client.logger.WithField("container", ctr.id)
// Note we always tell HCS to create stdout as it's required
// regardless of '-i' or '-t' options, so that docker can always grab
@@ -435,32 +438,13 @@ func (c *client) Start(_ context.Context, id, _ string, withStdin bool, attachSt
createProcessParms.User = ctr.ociSpec.Process.User.Username
- ctr.Lock()
-
// Start the command running in the container.
newProcess, err := ctr.hcsContainer.CreateProcess(createProcessParms)
if err != nil {
logger.WithError(err).Error("CreateProcess() failed")
- // Fix for https://github.com/moby/moby/issues/38719.
- // If the init process failed to launch, we still need to reap the
- // container to avoid leaking it.
- //
- // Note we use the explicit exit code of 127 which is the
- // Linux shell equivalent of "command not found". Windows cannot
- // know ahead of time whether or not the command exists, especially
- // in the case of Hyper-V containers.
- ctr.Unlock()
- exitedAt := time.Now()
- p := &process{
- id: libcontainerdtypes.InitProcessName,
- pid: 0,
- }
- c.reapContainer(ctr, p, 127, exitedAt, nil, logger)
- return -1, err
+ return nil, err
}
- defer ctr.Unlock()
-
defer func() {
if err != nil {
if err := newProcess.Kill(); err != nil {
@@ -476,55 +460,69 @@ func (c *client) Start(_ context.Context, id, _ string, withStdin bool, attachSt
}()
}
}()
- p := &process{
- hcsProcess: newProcess,
+ t := &task{process: process{
id: libcontainerdtypes.InitProcessName,
- pid: newProcess.Pid(),
- }
- logger.WithField("pid", p.pid).Debug("init process started")
-
- ctr.status = containerd.Running
- ctr.init = p
+ ctr: ctr,
+ hcsProcess: newProcess,
+ waitCh: make(chan struct{}),
+ }}
+ pid := t.Pid()
+ logger.WithField("pid", pid).Debug("init process started")
- // Spin up a go routine waiting for exit to handle cleanup
- go c.reapProcess(ctr, p)
+ // Spin up a goroutine to notify the backend and clean up resources when
+ // the task exits. Defer until after the start event is sent so that the
+ // exit event is not sent out-of-order.
+ defer func() { go t.reap() }()
// Don't shadow err here due to our deferred clean-up.
var dio *cio.DirectIO
dio, err = newIOFromProcess(newProcess, ctr.ociSpec.Process.Terminal)
if err != nil {
logger.WithError(err).Error("failed to get stdio pipes")
- return -1, err
+ return nil, err
}
_, err = attachStdio(dio)
if err != nil {
logger.WithError(err).Error("failed to attach stdio")
- return -1, err
+ return nil, err
}
+ // All fallible operations have succeeded so it is now safe to set the
+ // container's current task.
+ ctr.task = t
+
// Generate the associated event
- c.eventQ.Append(id, func() {
+ ctr.client.eventQ.Append(ctr.id, func() {
ei := libcontainerdtypes.EventInfo{
- ContainerID: id,
+ ContainerID: ctr.id,
ProcessID: libcontainerdtypes.InitProcessName,
- Pid: uint32(p.pid),
+ Pid: pid,
}
- c.logger.WithFields(logrus.Fields{
+ ctr.client.logger.WithFields(logrus.Fields{
"container": ctr.id,
"event": libcontainerdtypes.EventStart,
"event-info": ei,
}).Info("sending event")
- err := c.backend.ProcessEvent(ei.ContainerID, libcontainerdtypes.EventStart, ei)
+ err := ctr.client.backend.ProcessEvent(ei.ContainerID, libcontainerdtypes.EventStart, ei)
if err != nil {
- c.logger.WithError(err).WithFields(logrus.Fields{
- "container": id,
+ ctr.client.logger.WithError(err).WithFields(logrus.Fields{
+ "container": ei.ContainerID,
"event": libcontainerdtypes.EventStart,
"event-info": ei,
}).Error("failed to process event")
}
})
logger.Debug("start() completed")
- return p.pid, nil
+ return t, nil
+}
+
+func (ctr *container) Task(context.Context) (libcontainerdtypes.Task, error) {
+ ctr.mu.Lock()
+ defer ctr.mu.Unlock()
+ if ctr.task == nil {
+ return nil, errdefs.NotFound(containerderrdefs.ErrNotFound)
+ }
+ return ctr.task, nil
}
// setCommandLineAndArgs configures the HCS ProcessConfig based on an OCI process spec
@@ -554,19 +552,18 @@ func newIOFromProcess(newProcess hcsshim.Process, terminal bool) (*cio.DirectIO,
return dio, nil
}
-// Exec adds a process in an running container
-func (c *client) Exec(ctx context.Context, containerID, processID string, spec *specs.Process, withStdin bool, attachStdio libcontainerdtypes.StdioCallback) (int, error) {
- ctr := c.getContainer(containerID)
- switch {
- case ctr == nil:
- return -1, errors.WithStack(errdefs.NotFound(errors.New("no such container")))
- case ctr.hcsContainer == nil:
- return -1, errors.WithStack(errdefs.InvalidParameter(errors.New("container is not running")))
- case ctr.execs != nil && ctr.execs[processID] != nil:
- return -1, errors.WithStack(errdefs.Conflict(errors.New("id already in use")))
- }
- logger := c.logger.WithFields(logrus.Fields{
- "container": containerID,
+// Exec launches a process in a running container.
+//
+// The processID argument is entirely informational. As there is no mechanism
+// (exposed through the libcontainerd interfaces) to enumerate or reference an
+// exec'd process by ID, uniqueness is not currently enforced.
+func (t *task) Exec(ctx context.Context, processID string, spec *specs.Process, withStdin bool, attachStdio libcontainerdtypes.StdioCallback) (libcontainerdtypes.Process, error) {
+ hcsContainer, err := t.getHCSContainer()
+ if err != nil {
+ return nil, err
+ }
+ logger := t.ctr.client.logger.WithFields(logrus.Fields{
+ "container": t.ctr.id,
"exec": processID,
})
@@ -593,7 +590,7 @@ func (c *client) Exec(ctx context.Context, containerID, processID string, spec *
if spec.Cwd != "" {
createProcessParms.WorkingDirectory = spec.Cwd
} else {
- createProcessParms.WorkingDirectory = ctr.ociSpec.Process.Cwd
+ createProcessParms.WorkingDirectory = t.ctr.ociSpec.Process.Cwd
}
// Configure the environment for the process
@@ -606,10 +603,10 @@ func (c *client) Exec(ctx context.Context, containerID, processID string, spec *
createProcessParms.User = spec.User.Username
// Start the command running in the container.
- newProcess, err := ctr.hcsContainer.CreateProcess(createProcessParms)
+ newProcess, err := hcsContainer.CreateProcess(createProcessParms)
if err != nil {
logger.WithError(err).Errorf("exec's CreateProcess() failed")
- return -1, err
+ return nil, err
}
pid := newProcess.Pid()
defer func() {
@@ -631,163 +628,180 @@ func (c *client) Exec(ctx context.Context, containerID, processID string, spec *
dio, err := newIOFromProcess(newProcess, spec.Terminal)
if err != nil {
logger.WithError(err).Error("failed to get stdio pipes")
- return -1, err
+ return nil, err
}
// Tell the engine to attach streams back to the client
_, err = attachStdio(dio)
if err != nil {
- return -1, err
+ return nil, err
}
p := &process{
id: processID,
- pid: pid,
+ ctr: t.ctr,
hcsProcess: newProcess,
+ waitCh: make(chan struct{}),
}
- // Add the process to the container's list of processes
- ctr.Lock()
- ctr.execs[processID] = p
- ctr.Unlock()
-
- // Spin up a go routine waiting for exit to handle cleanup
- go c.reapProcess(ctr, p)
+ // Spin up a goroutine to notify the backend and clean up resources when
+ // the process exits. Defer until after the start event is sent so that
+ // the exit event is not sent out-of-order.
+ defer func() { go p.reap() }()
- c.eventQ.Append(ctr.id, func() {
+ t.ctr.client.eventQ.Append(t.ctr.id, func() {
ei := libcontainerdtypes.EventInfo{
- ContainerID: ctr.id,
+ ContainerID: t.ctr.id,
ProcessID: p.id,
- Pid: uint32(p.pid),
+ Pid: uint32(pid),
}
- c.logger.WithFields(logrus.Fields{
- "container": ctr.id,
+ t.ctr.client.logger.WithFields(logrus.Fields{
+ "container": t.ctr.id,
"event": libcontainerdtypes.EventExecAdded,
"event-info": ei,
}).Info("sending event")
- err := c.backend.ProcessEvent(ctr.id, libcontainerdtypes.EventExecAdded, ei)
+ err := t.ctr.client.backend.ProcessEvent(t.ctr.id, libcontainerdtypes.EventExecAdded, ei)
if err != nil {
- c.logger.WithError(err).WithFields(logrus.Fields{
- "container": ctr.id,
+ t.ctr.client.logger.WithError(err).WithFields(logrus.Fields{
+ "container": t.ctr.id,
"event": libcontainerdtypes.EventExecAdded,
"event-info": ei,
}).Error("failed to process event")
}
- err = c.backend.ProcessEvent(ctr.id, libcontainerdtypes.EventExecStarted, ei)
+ err = t.ctr.client.backend.ProcessEvent(t.ctr.id, libcontainerdtypes.EventExecStarted, ei)
if err != nil {
- c.logger.WithError(err).WithFields(logrus.Fields{
- "container": ctr.id,
+ t.ctr.client.logger.WithError(err).WithFields(logrus.Fields{
+ "container": t.ctr.id,
"event": libcontainerdtypes.EventExecStarted,
"event-info": ei,
}).Error("failed to process event")
}
})
- return pid, nil
+ return p, nil
+}
+
+func (p *process) Pid() uint32 {
+ p.mu.Lock()
+ hcsProcess := p.hcsProcess
+ p.mu.Unlock()
+ if hcsProcess == nil {
+ return 0
+ }
+ return uint32(hcsProcess.Pid())
+}
+
+func (p *process) Kill(_ context.Context, signal syscall.Signal) error {
+ p.mu.Lock()
+ hcsProcess := p.hcsProcess
+ p.mu.Unlock()
+ if hcsProcess == nil {
+ return errors.WithStack(errdefs.NotFound(errors.New("process not found")))
+ }
+ return hcsProcess.Kill()
}
-// SignalProcess handles `docker stop` on Windows. While Linux has support for
+// Kill handles `docker stop` on Windows. While Linux has support for
// the full range of signals, signals aren't really implemented on Windows.
// We fake supporting regular stop and -9 to force kill.
-func (c *client) SignalProcess(_ context.Context, containerID, processID string, signal syscall.Signal) error {
- ctr, p, err := c.getProcess(containerID, processID)
+func (t *task) Kill(_ context.Context, signal syscall.Signal) error {
+ hcsContainer, err := t.getHCSContainer()
if err != nil {
return err
}
- logger := c.logger.WithFields(logrus.Fields{
- "container": containerID,
- "process": processID,
- "pid": p.pid,
+ logger := t.ctr.client.logger.WithFields(logrus.Fields{
+ "container": t.ctr.id,
+ "process": t.id,
+ "pid": t.Pid(),
"signal": signal,
})
logger.Debug("Signal()")
- if processID == libcontainerdtypes.InitProcessName {
- if syscall.Signal(signal) == syscall.SIGKILL {
- // Terminate the compute system
- ctr.Lock()
- ctr.terminateInvoked = true
- if err := ctr.hcsContainer.Terminate(); err != nil {
- if !hcsshim.IsPending(err) {
- logger.WithError(err).Error("failed to terminate hccshim container")
- }
- }
- ctr.Unlock()
- } else {
- // Shut down the container
- if err := ctr.hcsContainer.Shutdown(); err != nil {
- if !hcsshim.IsPending(err) && !hcsshim.IsAlreadyStopped(err) {
- // ignore errors
- logger.WithError(err).Error("failed to shutdown hccshim container")
- }
- }
- }
+ var op string
+ if signal == syscall.SIGKILL {
+ // Terminate the compute system
+ t.ctr.mu.Lock()
+ t.ctr.terminateInvoked = true
+ t.ctr.mu.Unlock()
+ op, err = "terminate", hcsContainer.Terminate()
} else {
- return p.hcsProcess.Kill()
+ // Shut down the container
+ op, err = "shutdown", hcsContainer.Shutdown()
+ }
+ if err != nil {
+ if !hcsshim.IsPending(err) && !hcsshim.IsAlreadyStopped(err) {
+ // ignore errors
+ logger.WithError(err).Errorf("failed to %s hccshim container", op)
+ }
}
return nil
}
-// ResizeTerminal handles a CLI event to resize an interactive docker run or docker
+// Resize handles a CLI event to resize an interactive docker run or docker
// exec window.
-func (c *client) ResizeTerminal(_ context.Context, containerID, processID string, width, height int) error {
- _, p, err := c.getProcess(containerID, processID)
- if err != nil {
- return err
+func (p *process) Resize(_ context.Context, width, height uint32) error {
+ p.mu.Lock()
+ hcsProcess := p.hcsProcess
+ p.mu.Unlock()
+ if hcsProcess == nil {
+ return errors.WithStack(errdefs.NotFound(errors.New("process not found")))
}
- c.logger.WithFields(logrus.Fields{
- "container": containerID,
- "process": processID,
+ p.ctr.client.logger.WithFields(logrus.Fields{
+ "container": p.ctr.id,
+ "process": p.id,
"height": height,
"width": width,
- "pid": p.pid,
+ "pid": hcsProcess.Pid(),
}).Debug("resizing")
- return p.hcsProcess.ResizeConsole(uint16(width), uint16(height))
+ return hcsProcess.ResizeConsole(uint16(width), uint16(height))
}
-func (c *client) CloseStdin(_ context.Context, containerID, processID string) error {
- _, p, err := c.getProcess(containerID, processID)
- if err != nil {
- return err
+func (p *process) CloseStdin(context.Context) error {
+ p.mu.Lock()
+ hcsProcess := p.hcsProcess
+ p.mu.Unlock()
+ if hcsProcess == nil {
+ return errors.WithStack(errdefs.NotFound(errors.New("process not found")))
}
- return p.hcsProcess.CloseStdin()
+ return hcsProcess.CloseStdin()
}
// Pause handles pause requests for containers
-func (c *client) Pause(_ context.Context, containerID string) error {
- ctr, _, err := c.getProcess(containerID, libcontainerdtypes.InitProcessName)
- if err != nil {
- return err
- }
-
- if ctr.ociSpec.Windows.HyperV == nil {
+func (t *task) Pause(_ context.Context) error {
+ if t.ctr.ociSpec.Windows.HyperV == nil {
return containerderrdefs.ErrNotImplemented
}
- ctr.Lock()
- defer ctr.Unlock()
+ t.ctr.mu.Lock()
+ defer t.ctr.mu.Unlock()
- if err = ctr.hcsContainer.Pause(); err != nil {
+ if err := t.assertIsCurrentTask(); err != nil {
+ return err
+ }
+ if t.ctr.hcsContainer == nil {
+ return errdefs.NotFound(errors.WithStack(fmt.Errorf("container %q not found", t.ctr.id)))
+ }
+ if err := t.ctr.hcsContainer.Pause(); err != nil {
return err
}
- ctr.status = containerd.Paused
+ t.ctr.isPaused = true
- c.eventQ.Append(containerID, func() {
- err := c.backend.ProcessEvent(containerID, libcontainerdtypes.EventPaused, libcontainerdtypes.EventInfo{
- ContainerID: containerID,
+ t.ctr.client.eventQ.Append(t.ctr.id, func() {
+ err := t.ctr.client.backend.ProcessEvent(t.ctr.id, libcontainerdtypes.EventPaused, libcontainerdtypes.EventInfo{
+ ContainerID: t.ctr.id,
ProcessID: libcontainerdtypes.InitProcessName,
})
- c.logger.WithFields(logrus.Fields{
- "container": ctr.id,
+ t.ctr.client.logger.WithFields(logrus.Fields{
+ "container": t.ctr.id,
"event": libcontainerdtypes.EventPaused,
}).Info("sending event")
if err != nil {
- c.logger.WithError(err).WithFields(logrus.Fields{
- "container": containerID,
+ t.ctr.client.logger.WithError(err).WithFields(logrus.Fields{
+ "container": t.ctr.id,
"event": libcontainerdtypes.EventPaused,
}).Error("failed to process event")
}
@@ -797,37 +811,38 @@ func (c *client) Pause(_ context.Context, containerID string) error {
}
// Resume handles resume requests for containers
-func (c *client) Resume(_ context.Context, containerID string) error {
- ctr, _, err := c.getProcess(containerID, libcontainerdtypes.InitProcessName)
- if err != nil {
- return err
- }
-
- if ctr.ociSpec.Windows.HyperV == nil {
+func (t *task) Resume(ctx context.Context) error {
+ if t.ctr.ociSpec.Windows.HyperV == nil {
return errors.New("cannot resume Windows Server Containers")
}
- ctr.Lock()
- defer ctr.Unlock()
+ t.ctr.mu.Lock()
+ defer t.ctr.mu.Unlock()
- if err = ctr.hcsContainer.Resume(); err != nil {
+ if err := t.assertIsCurrentTask(); err != nil {
+ return err
+ }
+ if t.ctr.hcsContainer == nil {
+ return errdefs.NotFound(errors.WithStack(fmt.Errorf("container %q not found", t.ctr.id)))
+ }
+ if err := t.ctr.hcsContainer.Resume(); err != nil {
return err
}
- ctr.status = containerd.Running
+ t.ctr.isPaused = false
- c.eventQ.Append(containerID, func() {
- err := c.backend.ProcessEvent(containerID, libcontainerdtypes.EventResumed, libcontainerdtypes.EventInfo{
- ContainerID: containerID,
+ t.ctr.client.eventQ.Append(t.ctr.id, func() {
+ err := t.ctr.client.backend.ProcessEvent(t.ctr.id, libcontainerdtypes.EventResumed, libcontainerdtypes.EventInfo{
+ ContainerID: t.ctr.id,
ProcessID: libcontainerdtypes.InitProcessName,
})
- c.logger.WithFields(logrus.Fields{
- "container": ctr.id,
+ t.ctr.client.logger.WithFields(logrus.Fields{
+ "container": t.ctr.id,
"event": libcontainerdtypes.EventResumed,
}).Info("sending event")
if err != nil {
- c.logger.WithError(err).WithFields(logrus.Fields{
- "container": containerID,
+ t.ctr.client.logger.WithError(err).WithFields(logrus.Fields{
+ "container": t.ctr.id,
"event": libcontainerdtypes.EventResumed,
}).Error("failed to process event")
}
@@ -837,14 +852,14 @@ func (c *client) Resume(_ context.Context, containerID string) error {
}
// Stats handles stats requests for containers
-func (c *client) Stats(_ context.Context, containerID string) (*libcontainerdtypes.Stats, error) {
- ctr, _, err := c.getProcess(containerID, libcontainerdtypes.InitProcessName)
+func (t *task) Stats(_ context.Context) (*libcontainerdtypes.Stats, error) {
+ hc, err := t.getHCSContainer()
if err != nil {
return nil, err
}
readAt := time.Now()
- s, err := ctr.hcsContainer.Statistics()
+ s, err := hc.Statistics()
if err != nil {
return nil, err
}
@@ -854,9 +869,9 @@ func (c *client) Stats(_ context.Context, containerID string) (*libcontainerdtyp
}, nil
}
-// Restore is the handler for restoring a container
-func (c *client) Restore(ctx context.Context, id string, attachStdio libcontainerdtypes.StdioCallback) (bool, int, libcontainerdtypes.Process, error) {
- c.logger.WithField("container", id).Debug("restore()")
+// LoadContainer is the handler for restoring a container
+func (c *client) LoadContainer(ctx context.Context, id string) (libcontainerdtypes.Container, error) {
+ c.logger.WithField("container", id).Debug("LoadContainer()")
// TODO Windows: On RS1, a re-attach isn't possible.
// However, there is a scenario in which there is an issue.
@@ -865,30 +880,40 @@ func (c *client) Restore(ctx context.Context, id string, attachStdio libcontaine
// For consistence, we call in to shoot it regardless if HCS knows about it
// We explicitly just log a warning if the terminate fails.
// Then we tell the backend the container exited.
- if hc, err := hcsshim.OpenContainer(id); err == nil {
- const terminateTimeout = time.Minute * 2
- err := hc.Terminate()
-
- if hcsshim.IsPending(err) {
- err = hc.WaitTimeout(terminateTimeout)
- } else if hcsshim.IsAlreadyStopped(err) {
- err = nil
- }
+ hc, err := hcsshim.OpenContainer(id)
+ if err != nil {
+ return nil, errdefs.NotFound(errors.New("container not found"))
+ }
+ const terminateTimeout = time.Minute * 2
+ err = hc.Terminate()
- if err != nil {
- c.logger.WithField("container", id).WithError(err).Debug("terminate failed on restore")
- return false, -1, nil, err
- }
+ if hcsshim.IsPending(err) {
+ err = hc.WaitTimeout(terminateTimeout)
+ } else if hcsshim.IsAlreadyStopped(err) {
+ err = nil
+ }
+
+ if err != nil {
+ c.logger.WithField("container", id).WithError(err).Debug("terminate failed on restore")
+ return nil, err
}
- return false, -1, &restoredProcess{
- c: c,
- id: id,
+ return &container{
+ client: c,
+ hcsContainer: hc,
+ id: id,
}, nil
}
-// ListPids returns a list of process IDs running in a container. It is not
+// AttachTask is only called by the daemon when restoring containers. As
+// re-attach isn't possible (see LoadContainer), a NotFound error is
+// unconditionally returned to allow restore to make progress.
+func (*container) AttachTask(context.Context, libcontainerdtypes.StdioCallback) (libcontainerdtypes.Task, error) {
+ return nil, errdefs.NotFound(containerderrdefs.ErrNotImplemented)
+}
+
+// Pids returns a list of process IDs running in a container. It is not
// implemented on Windows.
-func (c *client) ListPids(_ context.Context, _ string) ([]uint32, error) {
+func (t *task) Pids(context.Context) ([]containerd.ProcessInfo, error) {
return nil, errors.New("not implemented on Windows")
}
@@ -898,13 +923,13 @@ func (c *client) ListPids(_ context.Context, _ string) ([]uint32, error) {
// the containers could be Hyper-V containers, they would not be
// visible on the container host. However, libcontainerd does have
// that information.
-func (c *client) Summary(_ context.Context, containerID string) ([]libcontainerdtypes.Summary, error) {
- ctr, _, err := c.getProcess(containerID, libcontainerdtypes.InitProcessName)
+func (t *task) Summary(_ context.Context) ([]libcontainerdtypes.Summary, error) {
+ hc, err := t.getHCSContainer()
if err != nil {
return nil, err
}
- p, err := ctr.hcsContainer.ProcessList()
+ p, err := hc.ProcessList()
if err != nil {
return nil, err
}
@@ -926,118 +951,114 @@ func (c *client) Summary(_ context.Context, containerID string) ([]libcontainerd
return pl, nil
}
-type restoredProcess struct {
- id string
- c *client
-}
-
-func (p *restoredProcess) Delete(ctx context.Context) (uint32, time.Time, error) {
- return p.c.DeleteTask(ctx, p.id)
-}
-
-func (c *client) DeleteTask(ctx context.Context, containerID string) (uint32, time.Time, error) {
- ec := -1
- ctr := c.getContainer(containerID)
- if ctr == nil {
- return uint32(ec), time.Now(), errors.WithStack(errdefs.NotFound(errors.New("no such container")))
+func (p *process) Delete(ctx context.Context) (*containerd.ExitStatus, error) {
+ select {
+ case <-ctx.Done():
+ return nil, errors.WithStack(ctx.Err())
+ case <-p.waitCh:
+ default:
+ return nil, errdefs.Conflict(errors.New("process is running"))
}
+ return p.exited, nil
+}
+func (t *task) Delete(ctx context.Context) (*containerd.ExitStatus, error) {
select {
case <-ctx.Done():
- return uint32(ec), time.Now(), errors.WithStack(ctx.Err())
- case <-ctr.waitCh:
+ return nil, errors.WithStack(ctx.Err())
+ case <-t.waitCh:
default:
- return uint32(ec), time.Now(), errors.New("container is not stopped")
+ return nil, errdefs.Conflict(errors.New("container is not stopped"))
}
- ctr.Lock()
- defer ctr.Unlock()
- return ctr.exitCode, ctr.exitedAt, nil
+ t.ctr.mu.Lock()
+ defer t.ctr.mu.Unlock()
+ if err := t.assertIsCurrentTask(); err != nil {
+ return nil, err
+ }
+ t.ctr.task = nil
+ return t.exited, nil
}
-func (c *client) Delete(_ context.Context, containerID string) error {
- c.Lock()
- defer c.Unlock()
- ctr := c.containers[containerID]
- if ctr == nil {
- return errors.WithStack(errdefs.NotFound(errors.New("no such container")))
+func (t *task) ForceDelete(ctx context.Context) error {
+ select {
+ case <-t.waitCh: // Task is already stopped.
+ _, err := t.Delete(ctx)
+ return err
+ default:
}
- ctr.Lock()
- defer ctr.Unlock()
-
- switch ctr.status {
- case containerd.Created:
- if err := c.shutdownContainer(ctr); err != nil {
- return err
- }
- fallthrough
- case containerd.Stopped:
- delete(c.containers, containerID)
- return nil
+ if err := t.Kill(ctx, syscall.SIGKILL); err != nil {
+ return errors.Wrap(err, "could not force-kill task")
}
- return errors.WithStack(errdefs.InvalidParameter(errors.New("container is not stopped")))
+ select {
+ case <-ctx.Done():
+ return ctx.Err()
+ case <-t.waitCh:
+ _, err := t.Delete(ctx)
+ return err
+ }
}
-func (c *client) Status(ctx context.Context, containerID string) (containerd.ProcessStatus, error) {
- c.Lock()
- defer c.Unlock()
- ctr := c.containers[containerID]
- if ctr == nil {
- return containerd.Unknown, errors.WithStack(errdefs.NotFound(errors.New("no such container")))
+func (t *task) Status(ctx context.Context) (containerd.Status, error) {
+ select {
+ case <-t.waitCh:
+ return containerd.Status{
+ Status: containerd.Stopped,
+ ExitStatus: t.exited.ExitCode(),
+ ExitTime: t.exited.ExitTime(),
+ }, nil
+ default:
}
- ctr.Lock()
- defer ctr.Unlock()
- return ctr.status, nil
+ t.ctr.mu.Lock()
+ defer t.ctr.mu.Unlock()
+ s := containerd.Running
+ if t.ctr.isPaused {
+ s = containerd.Paused
+ }
+ return containerd.Status{Status: s}, nil
}
-func (c *client) UpdateResources(ctx context.Context, containerID string, resources *libcontainerdtypes.Resources) error {
+func (*task) UpdateResources(ctx context.Context, resources *libcontainerdtypes.Resources) error {
// Updating resource isn't supported on Windows
// but we should return nil for enabling updating container
return nil
}
-func (c *client) CreateCheckpoint(ctx context.Context, containerID, checkpointDir string, exit bool) error {
+func (*task) CreateCheckpoint(ctx context.Context, checkpointDir string, exit bool) error {
return errors.New("Windows: Containers do not support checkpoints")
}
-func (c *client) getContainer(id string) *container {
- c.Lock()
- ctr := c.containers[id]
- c.Unlock()
-
- return ctr
+// assertIsCurrentTask returns a non-nil error if the task has been deleted.
+func (t *task) assertIsCurrentTask() error {
+ if t.ctr.task != t {
+ return errors.WithStack(errdefs.NotFound(fmt.Errorf("task %q not found", t.id)))
+ }
+ return nil
}
-func (c *client) getProcess(containerID, processID string) (*container, *process, error) {
- ctr := c.getContainer(containerID)
- switch {
- case ctr == nil:
- return nil, nil, errors.WithStack(errdefs.NotFound(errors.New("no such container")))
- case ctr.init == nil:
- return nil, nil, errors.WithStack(errdefs.NotFound(errors.New("container is not running")))
- case processID == libcontainerdtypes.InitProcessName:
- return ctr, ctr.init, nil
- default:
- ctr.Lock()
- defer ctr.Unlock()
- if ctr.execs == nil {
- return nil, nil, errors.WithStack(errdefs.NotFound(errors.New("no execs")))
- }
+// getHCSContainer returns a reference to the hcsshim Container for the task's
+// container if neither the task nor container have been deleted.
+//
+// t.ctr.mu must not be locked by the calling goroutine when calling this
+// function.
+func (t *task) getHCSContainer() (hcsshim.Container, error) {
+ t.ctr.mu.Lock()
+ defer t.ctr.mu.Unlock()
+ if err := t.assertIsCurrentTask(); err != nil {
+ return nil, err
}
-
- p := ctr.execs[processID]
- if p == nil {
- return nil, nil, errors.WithStack(errdefs.NotFound(errors.New("no such exec")))
+ hc := t.ctr.hcsContainer
+ if hc == nil {
+ return nil, errors.WithStack(errdefs.NotFound(fmt.Errorf("container %q not found", t.ctr.id)))
}
-
- return ctr, p, nil
+ return hc, nil
}
// ctr mutex must be held when calling this function.
-func (c *client) shutdownContainer(ctr *container) error {
+func (ctr *container) shutdownContainer() error {
var err error
const waitTimeout = time.Minute * 5
@@ -1052,11 +1073,11 @@ func (c *client) shutdownContainer(ctr *container) error {
}
if err != nil {
- c.logger.WithError(err).WithField("container", ctr.id).
+ ctr.client.logger.WithError(err).WithField("container", ctr.id).
Debug("failed to shutdown container, terminating it")
- terminateErr := c.terminateContainer(ctr)
+ terminateErr := ctr.terminateContainer()
if terminateErr != nil {
- c.logger.WithError(terminateErr).WithField("container", ctr.id).
+ ctr.client.logger.WithError(terminateErr).WithField("container", ctr.id).
Error("failed to shutdown container, and subsequent terminate also failed")
return fmt.Errorf("%s: subsequent terminate failed %s", err, terminateErr)
}
@@ -1067,7 +1088,7 @@ func (c *client) shutdownContainer(ctr *container) error {
}
// ctr mutex must be held when calling this function.
-func (c *client) terminateContainer(ctr *container) error {
+func (ctr *container) terminateContainer() error {
const terminateTimeout = time.Minute * 5
ctr.terminateInvoked = true
err := ctr.hcsContainer.Terminate()
@@ -1079,7 +1100,7 @@ func (c *client) terminateContainer(ctr *container) error {
}
if err != nil {
- c.logger.WithError(err).WithField("container", ctr.id).
+ ctr.client.logger.WithError(err).WithField("container", ctr.id).
Debug("failed to terminate container")
return err
}
@@ -1087,9 +1108,9 @@ func (c *client) terminateContainer(ctr *container) error {
return nil
}
-func (c *client) reapProcess(ctr *container, p *process) int {
- logger := c.logger.WithFields(logrus.Fields{
- "container": ctr.id,
+func (p *process) reap() {
+ logger := p.ctr.client.logger.WithFields(logrus.Fields{
+ "container": p.ctr.id,
"process": p.id,
})
@@ -1100,10 +1121,9 @@ func (c *client) reapProcess(ctr *container, p *process) int {
if herr, ok := err.(*hcsshim.ProcessError); ok && herr.Err != windows.ERROR_BROKEN_PIPE {
logger.WithError(err).Warnf("Wait() failed (container may have been killed)")
}
- // Fall through here, do not return. This ensures we attempt to
- // continue the shutdown in HCS and tell the docker engine that the
- // process/container has exited to avoid a container being dropped on
- // the floor.
+ // Fall through here, do not return. This ensures we tell the
+ // docker engine that the process/container has exited to avoid
+ // a container being dropped on the floor.
}
exitedAt := time.Now()
@@ -1116,87 +1136,88 @@ func (c *client) reapProcess(ctr *container, p *process) int {
// code we return doesn't incorrectly indicate success.
exitCode = -1
- // Fall through here, do not return. This ensures we attempt to
- // continue the shutdown in HCS and tell the docker engine that the
- // process/container has exited to avoid a container being dropped on
- // the floor.
+ // Fall through here, do not return. This ensures we tell the
+ // docker engine that the process/container has exited to avoid
+ // a container being dropped on the floor.
}
- if err := p.hcsProcess.Close(); err != nil {
+ p.mu.Lock()
+ hcsProcess := p.hcsProcess
+ p.hcsProcess = nil
+ p.mu.Unlock()
+
+ if err := hcsProcess.Close(); err != nil {
logger.WithError(err).Warnf("failed to cleanup hcs process resources")
exitCode = -1
eventErr = fmt.Errorf("hcsProcess.Close() failed %s", err)
}
- if p.id == libcontainerdtypes.InitProcessName {
- exitCode, eventErr = c.reapContainer(ctr, p, exitCode, exitedAt, eventErr, logger)
- }
+ // Explicit locking is not required as reads from exited are
+ // synchronized using waitCh.
+ p.exited = containerd.NewExitStatus(uint32(exitCode), exitedAt, nil)
+ close(p.waitCh)
- c.eventQ.Append(ctr.id, func() {
+ p.ctr.client.eventQ.Append(p.ctr.id, func() {
ei := libcontainerdtypes.EventInfo{
- ContainerID: ctr.id,
+ ContainerID: p.ctr.id,
ProcessID: p.id,
- Pid: uint32(p.pid),
+ Pid: uint32(hcsProcess.Pid()),
ExitCode: uint32(exitCode),
ExitedAt: exitedAt,
Error: eventErr,
}
- c.logger.WithFields(logrus.Fields{
- "container": ctr.id,
+ p.ctr.client.logger.WithFields(logrus.Fields{
+ "container": p.ctr.id,
"event": libcontainerdtypes.EventExit,
"event-info": ei,
}).Info("sending event")
- err := c.backend.ProcessEvent(ctr.id, libcontainerdtypes.EventExit, ei)
+ err := p.ctr.client.backend.ProcessEvent(p.ctr.id, libcontainerdtypes.EventExit, ei)
if err != nil {
- c.logger.WithError(err).WithFields(logrus.Fields{
- "container": ctr.id,
+ p.ctr.client.logger.WithError(err).WithFields(logrus.Fields{
+ "container": p.ctr.id,
"event": libcontainerdtypes.EventExit,
"event-info": ei,
}).Error("failed to process event")
}
- if p.id != libcontainerdtypes.InitProcessName {
- ctr.Lock()
- delete(ctr.execs, p.id)
- ctr.Unlock()
- }
})
-
- return exitCode
}
-// reapContainer shuts down the container and releases associated resources. It returns
-// the error to be logged in the eventInfo sent back to the monitor.
-func (c *client) reapContainer(ctr *container, p *process, exitCode int, exitedAt time.Time, eventErr error, logger *logrus.Entry) (int, error) {
- // Update container status
- ctr.Lock()
- ctr.status = containerd.Stopped
- ctr.exitedAt = exitedAt
- ctr.exitCode = uint32(exitCode)
- close(ctr.waitCh)
-
- if err := c.shutdownContainer(ctr); err != nil {
- exitCode = -1
- logger.WithError(err).Warn("failed to shutdown container")
- thisErr := errors.Wrap(err, "failed to shutdown container")
- if eventErr != nil {
- eventErr = errors.Wrap(eventErr, thisErr.Error())
- } else {
- eventErr = thisErr
+func (ctr *container) Delete(context.Context) error {
+ ctr.mu.Lock()
+ defer ctr.mu.Unlock()
+
+ if ctr.hcsContainer == nil {
+ return errors.WithStack(errdefs.NotFound(fmt.Errorf("container %q not found", ctr.id)))
+ }
+
+ // Check that there is no task currently running.
+ if ctr.task != nil {
+ select {
+ case <-ctr.task.waitCh:
+ default:
+ return errors.WithStack(errdefs.Conflict(errors.New("container is not stopped")))
}
+ }
+
+ var (
+ logger = ctr.client.logger.WithFields(logrus.Fields{
+ "container": ctr.id,
+ })
+ thisErr error
+ )
+
+ if err := ctr.shutdownContainer(); err != nil {
+ logger.WithError(err).Warn("failed to shutdown container")
+ thisErr = errors.Wrap(err, "failed to shutdown container")
} else {
logger.Debug("completed container shutdown")
}
- ctr.Unlock()
if err := ctr.hcsContainer.Close(); err != nil {
- exitCode = -1
logger.WithError(err).Error("failed to clean hcs container resources")
- thisErr := errors.Wrap(err, "failed to terminate container")
- if eventErr != nil {
- eventErr = errors.Wrap(eventErr, thisErr.Error())
- } else {
- eventErr = thisErr
- }
+ thisErr = errors.Wrap(err, "failed to terminate container")
}
- return exitCode, eventErr
+
+ ctr.hcsContainer = nil
+ return thisErr
}
diff --git a/libcontainerd/local/process_windows.go b/libcontainerd/local/process_windows.go
index 6ff9f7e83e..c8164be987 100644
--- a/libcontainerd/local/process_windows.go
+++ b/libcontainerd/local/process_windows.go
@@ -38,7 +38,3 @@ func createStdInCloser(pipe io.WriteCloser, process hcsshim.Process) io.WriteClo
return nil
})
}
-
-func (p *process) Cleanup() error {
- return nil
-}
diff --git a/libcontainerd/remote/client.go b/libcontainerd/remote/client.go
index a2b895b328..4af29300ab 100644
--- a/libcontainerd/remote/client.go
+++ b/libcontainerd/remote/client.go
@@ -45,25 +45,34 @@ type client struct {
logger *logrus.Entry
ns string
- backend libcontainerdtypes.Backend
- eventQ queue.Queue
- oomMu sync.Mutex
- oom map[string]bool
- v2runcoptionsMu sync.Mutex
- // v2runcoptions is used for copying options specified on Create() to Start()
- v2runcoptions map[string]v2runcoptions.Options
+ backend libcontainerdtypes.Backend
+ eventQ queue.Queue
+}
+
+type container struct {
+ client *client
+ c8dCtr containerd.Container
+
+ v2runcoptions *v2runcoptions.Options
+}
+
+type task struct {
+ containerd.Task
+ ctr *container
+}
+
+type process struct {
+ containerd.Process
}
// NewClient creates a new libcontainerd client from a containerd client
func NewClient(ctx context.Context, cli *containerd.Client, stateDir, ns string, b libcontainerdtypes.Backend) (libcontainerdtypes.Client, error) {
c := &client{
- client: cli,
- stateDir: stateDir,
- logger: logrus.WithField("module", "libcontainerd").WithField("namespace", ns),
- ns: ns,
- backend: b,
- oom: make(map[string]bool),
- v2runcoptions: make(map[string]v2runcoptions.Options),
+ client: cli,
+ stateDir: stateDir,
+ logger: logrus.WithField("module", "libcontainerd").WithField("namespace", ns),
+ ns: ns,
+ backend: b,
}
go c.processEventStream(ctx, ns)
@@ -75,58 +84,36 @@ func (c *client) Version(ctx context.Context) (containerd.Version, error) {
return c.client.Version(ctx)
}
-// Restore loads the containerd container.
-// It should not be called concurrently with any other operation for the given ID.
-func (c *client) Restore(ctx context.Context, id string, attachStdio libcontainerdtypes.StdioCallback) (alive bool, pid int, p libcontainerdtypes.Process, err error) {
+func (c *container) newTask(t containerd.Task) *task {
+ return &task{Task: t, ctr: c}
+}
+
+func (c *container) AttachTask(ctx context.Context, attachStdio libcontainerdtypes.StdioCallback) (_ libcontainerdtypes.Task, err error) {
var dio *cio.DirectIO
defer func() {
if err != nil && dio != nil {
dio.Cancel()
dio.Close()
}
- err = wrapError(err)
}()
- ctr, err := c.client.LoadContainer(ctx, id)
- if err != nil {
- return false, -1, nil, errors.WithStack(wrapError(err))
- }
-
attachIO := func(fifos *cio.FIFOSet) (cio.IO, error) {
// dio must be assigned to the previously defined dio for the defer above
// to handle cleanup
- dio, err = c.newDirectIO(ctx, fifos)
+ dio, err = c.client.newDirectIO(ctx, fifos)
if err != nil {
return nil, err
}
return attachStdio(dio)
}
- t, err := ctr.Task(ctx, attachIO)
- if err != nil && !containerderrors.IsNotFound(err) {
- return false, -1, nil, errors.Wrap(wrapError(err), "error getting containerd task for container")
- }
-
- if t != nil {
- s, err := t.Status(ctx)
- if err != nil {
- return false, -1, nil, errors.Wrap(wrapError(err), "error getting task status")
- }
- alive = s.Status != containerd.Stopped
- pid = int(t.Pid())
+ t, err := c.c8dCtr.Task(ctx, attachIO)
+ if err != nil {
+ return nil, errors.Wrap(wrapError(err), "error getting containerd task for container")
}
-
- c.logger.WithFields(logrus.Fields{
- "container": id,
- "alive": alive,
- "pid": pid,
- }).Debug("restored container")
-
- return alive, pid, &restoredProcess{
- p: t,
- }, nil
+ return c.newTask(t), nil
}
-func (c *client) Create(ctx context.Context, id string, ociSpec *specs.Spec, shim string, runtimeOptions interface{}, opts ...containerd.NewContainerOpts) error {
+func (c *client) NewContainer(ctx context.Context, id string, ociSpec *specs.Spec, shim string, runtimeOptions interface{}, opts ...containerd.NewContainerOpts) (libcontainerdtypes.Container, error) {
bdir := c.bundleDir(id)
c.logger.WithField("bundle", bdir).WithField("root", ociSpec.Root.Path).Debug("bundle dir created")
@@ -137,44 +124,43 @@ func (c *client) Create(ctx context.Context, id string, ociSpec *specs.Spec, shi
}
opts = append(opts, newOpts...)
- _, err := c.client.NewContainer(ctx, id, opts...)
+ ctr, err := c.client.NewContainer(ctx, id, opts...)
if err != nil {
if containerderrors.IsAlreadyExists(err) {
- return errors.WithStack(errdefs.Conflict(errors.New("id already in use")))
+ return nil, errors.WithStack(errdefs.Conflict(errors.New("id already in use")))
}
- return wrapError(err)
+ return nil, wrapError(err)
+ }
+
+ created := container{
+ client: c,
+ c8dCtr: ctr,
}
if x, ok := runtimeOptions.(*v2runcoptions.Options); ok {
- c.v2runcoptionsMu.Lock()
- c.v2runcoptions[id] = *x
- c.v2runcoptionsMu.Unlock()
+ created.v2runcoptions = x
}
- return nil
+ return &created, nil
}
// Start create and start a task for the specified containerd id
-func (c *client) Start(ctx context.Context, id, checkpointDir string, withStdin bool, attachStdio libcontainerdtypes.StdioCallback) (int, error) {
- ctr, err := c.getContainer(ctx, id)
- if err != nil {
- return -1, err
- }
+func (c *container) Start(ctx context.Context, checkpointDir string, withStdin bool, attachStdio libcontainerdtypes.StdioCallback) (libcontainerdtypes.Task, error) {
var (
cp *types.Descriptor
t containerd.Task
rio cio.IO
- stdinCloseSync = make(chan struct{})
+ stdinCloseSync = make(chan containerd.Process, 1)
)
if checkpointDir != "" {
// write checkpoint to the content store
tar := archive.Diff(ctx, "", checkpointDir)
- cp, err = c.writeContent(ctx, images.MediaTypeContainerd1Checkpoint, checkpointDir, tar)
+ cp, err := c.client.writeContent(ctx, images.MediaTypeContainerd1Checkpoint, checkpointDir, tar)
// remove the checkpoint when we're done
defer func() {
if cp != nil {
- err := c.client.ContentStore().Delete(context.Background(), cp.Digest)
+ err := c.client.client.ContentStore().Delete(ctx, cp.Digest)
if err != nil {
- c.logger.WithError(err).WithFields(logrus.Fields{
+ c.client.logger.WithError(err).WithFields(logrus.Fields{
"ref": checkpointDir,
"digest": cp.Digest,
}).Warnf("failed to delete temporary checkpoint entry")
@@ -182,23 +168,27 @@ func (c *client) Start(ctx context.Context, id, checkpointDir string, withStdin
}
}()
if err := tar.Close(); err != nil {
- return -1, errors.Wrap(err, "failed to close checkpoint tar stream")
+ return nil, errors.Wrap(err, "failed to close checkpoint tar stream")
}
if err != nil {
- return -1, errors.Wrapf(err, "failed to upload checkpoint to containerd")
+ return nil, errors.Wrapf(err, "failed to upload checkpoint to containerd")
}
}
- spec, err := ctr.Spec(ctx)
+ // Optimization: assume the relevant metadata has not changed in the
+ // moment since the container was created. Elide redundant RPC requests
+ // to refresh the metadata separately for spec and labels.
+ md, err := c.c8dCtr.Info(ctx, containerd.WithoutRefreshedMetadata)
if err != nil {
- return -1, errors.Wrap(err, "failed to retrieve spec")
+ return nil, errors.Wrap(err, "failed to retrieve metadata")
}
- labels, err := ctr.Labels(ctx)
- if err != nil {
- return -1, errors.Wrap(err, "failed to retrieve labels")
+ bundle := md.Labels[DockerContainerBundlePath]
+
+ var spec specs.Spec
+ if err := json.Unmarshal(md.Spec.GetValue(), &spec); err != nil {
+ return nil, errors.Wrap(err, "failed to retrieve spec")
}
- bundle := labels[DockerContainerBundlePath]
- uid, gid := getSpecUser(spec)
+ uid, gid := getSpecUser(&spec)
taskOpts := []containerd.NewTaskOpts{
func(_ context.Context, _ *containerd.Client, info *containerd.TaskInfo) error {
@@ -209,10 +199,8 @@ func (c *client) Start(ctx context.Context, id, checkpointDir string, withStdin
if runtime.GOOS != "windows" {
taskOpts = append(taskOpts, func(_ context.Context, _ *containerd.Client, info *containerd.TaskInfo) error {
- c.v2runcoptionsMu.Lock()
- opts, ok := c.v2runcoptions[id]
- c.v2runcoptionsMu.Unlock()
- if ok {
+ if c.v2runcoptions != nil {
+ opts := *c.v2runcoptions
opts.IoUid = uint32(uid)
opts.IoGid = uint32(gid)
info.Options = &opts
@@ -220,14 +208,14 @@ func (c *client) Start(ctx context.Context, id, checkpointDir string, withStdin
return nil
})
} else {
- taskOpts = append(taskOpts, withLogLevel(c.logger.Level))
+ taskOpts = append(taskOpts, withLogLevel(c.client.logger.Level))
}
- t, err = ctr.NewTask(ctx,
+ t, err = c.c8dCtr.NewTask(ctx,
func(id string) (cio.IO, error) {
fifos := newFIFOSet(bundle, libcontainerdtypes.InitProcessName, withStdin, spec.Process.Terminal)
- rio, err = c.createIO(fifos, id, libcontainerdtypes.InitProcessName, stdinCloseSync, attachStdio)
+ rio, err = c.createIO(fifos, libcontainerdtypes.InitProcessName, stdinCloseSync, attachStdio)
return rio, err
},
taskOpts...,
@@ -238,21 +226,21 @@ func (c *client) Start(ctx context.Context, id, checkpointDir string, withStdin
rio.Cancel()
rio.Close()
}
- return -1, wrapError(err)
+ return nil, errors.Wrap(wrapError(err), "failed to create task for container")
}
// Signal c.createIO that it can call CloseIO
- close(stdinCloseSync)
+ stdinCloseSync <- t
if err := t.Start(ctx); err != nil {
if _, err := t.Delete(ctx); err != nil {
- c.logger.WithError(err).WithField("container", id).
+ c.client.logger.WithError(err).WithField("container", c.c8dCtr.ID()).
Error("failed to delete task after fail start")
}
- return -1, wrapError(err)
+ return nil, wrapError(err)
}
- return int(t.Pid()), nil
+ return c.newTask(t), nil
}
// Exec creates exec process.
@@ -262,31 +250,21 @@ func (c *client) Start(ctx context.Context, id, checkpointDir string, withStdin
// for the container main process, the stdin fifo will be created in Create not
// the Start call. stdinCloseSync channel should be closed after Start exec
// process.
-func (c *client) Exec(ctx context.Context, containerID, processID string, spec *specs.Process, withStdin bool, attachStdio libcontainerdtypes.StdioCallback) (int, error) {
- ctr, err := c.getContainer(ctx, containerID)
- if err != nil {
- return -1, err
- }
- t, err := ctr.Task(ctx, nil)
- if err != nil {
- if containerderrors.IsNotFound(err) {
- return -1, errors.WithStack(errdefs.InvalidParameter(errors.New("container is not running")))
- }
- return -1, wrapError(err)
- }
-
+func (t *task) Exec(ctx context.Context, processID string, spec *specs.Process, withStdin bool, attachStdio libcontainerdtypes.StdioCallback) (libcontainerdtypes.Process, error) {
var (
p containerd.Process
rio cio.IO
- stdinCloseSync = make(chan struct{})
+ stdinCloseSync = make(chan containerd.Process, 1)
)
- labels, err := ctr.Labels(ctx)
+ // Optimization: assume the DockerContainerBundlePath label has not been
+ // updated since the container metadata was last loaded/refreshed.
+ md, err := t.ctr.c8dCtr.Info(ctx, containerd.WithoutRefreshedMetadata)
if err != nil {
- return -1, wrapError(err)
+ return nil, wrapError(err)
}
- fifos := newFIFOSet(labels[DockerContainerBundlePath], processID, withStdin, spec.Terminal)
+ fifos := newFIFOSet(md.Labels[DockerContainerBundlePath], processID, withStdin, spec.Terminal)
defer func() {
if err != nil {
@@ -297,22 +275,22 @@ func (c *client) Exec(ctx context.Context, containerID, processID string, spec *
}
}()
- p, err = t.Exec(ctx, processID, spec, func(id string) (cio.IO, error) {
- rio, err = c.createIO(fifos, containerID, processID, stdinCloseSync, attachStdio)
+ p, err = t.Task.Exec(ctx, processID, spec, func(id string) (cio.IO, error) {
+ rio, err = t.ctr.createIO(fifos, processID, stdinCloseSync, attachStdio)
return rio, err
})
if err != nil {
close(stdinCloseSync)
if containerderrors.IsAlreadyExists(err) {
- return -1, errors.WithStack(errdefs.Conflict(errors.New("id already in use")))
+ return nil, errors.WithStack(errdefs.Conflict(errors.New("id already in use")))
}
- return -1, wrapError(err)
+ return nil, wrapError(err)
}
// Signal c.createIO that it can call CloseIO
//
// the stdin of exec process will be created after p.Start in containerd
- defer close(stdinCloseSync)
+ defer func() { stdinCloseSync <- p }()
if err = p.Start(ctx); err != nil {
// use new context for cleanup because old one may be cancelled by user, but leave a timeout to make sure
@@ -321,62 +299,29 @@ func (c *client) Exec(ctx context.Context, containerID, processID string, spec *
ctx, cancel := context.WithTimeout(context.Background(), 45*time.Second)
defer cancel()
p.Delete(ctx)
- return -1, wrapError(err)
- }
- return int(p.Pid()), nil
-}
-
-func (c *client) SignalProcess(ctx context.Context, containerID, processID string, signal syscall.Signal) error {
- p, err := c.getProcess(ctx, containerID, processID)
- if err != nil {
- return err
+ return nil, wrapError(err)
}
- return wrapError(p.Kill(ctx, signal))
+ return process{p}, nil
}
-func (c *client) ResizeTerminal(ctx context.Context, containerID, processID string, width, height int) error {
- p, err := c.getProcess(ctx, containerID, processID)
- if err != nil {
- return err
- }
-
- return p.Resize(ctx, uint32(width), uint32(height))
+func (t *task) Kill(ctx context.Context, signal syscall.Signal) error {
+ return wrapError(t.Task.Kill(ctx, signal))
}
-func (c *client) CloseStdin(ctx context.Context, containerID, processID string) error {
- p, err := c.getProcess(ctx, containerID, processID)
- if err != nil {
- return err
- }
-
- return p.CloseIO(ctx, containerd.WithStdinCloser)
+func (p process) Kill(ctx context.Context, signal syscall.Signal) error {
+ return wrapError(p.Process.Kill(ctx, signal))
}
-func (c *client) Pause(ctx context.Context, containerID string) error {
- p, err := c.getProcess(ctx, containerID, libcontainerdtypes.InitProcessName)
- if err != nil {
- return err
- }
-
- return wrapError(p.(containerd.Task).Pause(ctx))
+func (t *task) Pause(ctx context.Context) error {
+ return wrapError(t.Task.Pause(ctx))
}
-func (c *client) Resume(ctx context.Context, containerID string) error {
- p, err := c.getProcess(ctx, containerID, libcontainerdtypes.InitProcessName)
- if err != nil {
- return err
- }
-
- return p.(containerd.Task).Resume(ctx)
+func (t *task) Resume(ctx context.Context) error {
+ return wrapError(t.Task.Resume(ctx))
}
-func (c *client) Stats(ctx context.Context, containerID string) (*libcontainerdtypes.Stats, error) {
- p, err := c.getProcess(ctx, containerID, libcontainerdtypes.InitProcessName)
- if err != nil {
- return nil, err
- }
-
- m, err := p.(containerd.Task).Metrics(ctx)
+func (t *task) Stats(ctx context.Context) (*libcontainerdtypes.Stats, error) {
+ m, err := t.Metrics(ctx)
if err != nil {
return nil, err
}
@@ -388,32 +333,8 @@ func (c *client) Stats(ctx context.Context, containerID string) (*libcontainerdt
return libcontainerdtypes.InterfaceToStats(m.Timestamp, v), nil
}
-func (c *client) ListPids(ctx context.Context, containerID string) ([]uint32, error) {
- p, err := c.getProcess(ctx, containerID, libcontainerdtypes.InitProcessName)
- if err != nil {
- return nil, err
- }
-
- pis, err := p.(containerd.Task).Pids(ctx)
- if err != nil {
- return nil, err
- }
-
- var pids []uint32
- for _, i := range pis {
- pids = append(pids, i.Pid)
- }
-
- return pids, nil
-}
-
-func (c *client) Summary(ctx context.Context, containerID string) ([]libcontainerdtypes.Summary, error) {
- p, err := c.getProcess(ctx, containerID, libcontainerdtypes.InitProcessName)
- if err != nil {
- return nil, err
- }
-
- pis, err := p.(containerd.Task).Pids(ctx)
+func (t *task) Summary(ctx context.Context) ([]libcontainerdtypes.Summary, error) {
+ pis, err := t.Pids(ctx)
if err != nil {
return nil, err
}
@@ -434,57 +355,31 @@ func (c *client) Summary(ctx context.Context, containerID string) ([]libcontaine
return infos, nil
}
-type restoredProcess struct {
- p containerd.Process
-}
-
-func (p *restoredProcess) Delete(ctx context.Context) (uint32, time.Time, error) {
- if p.p == nil {
- return 255, time.Now(), nil
- }
- status, err := p.p.Delete(ctx)
- if err != nil {
- return 255, time.Now(), nil
- }
- return status.ExitCode(), status.ExitTime(), nil
+func (t *task) Delete(ctx context.Context) (*containerd.ExitStatus, error) {
+ s, err := t.Task.Delete(ctx)
+ return s, wrapError(err)
}
-func (c *client) DeleteTask(ctx context.Context, containerID string) (uint32, time.Time, error) {
- p, err := c.getProcess(ctx, containerID, libcontainerdtypes.InitProcessName)
- if err != nil {
- return 255, time.Now(), nil
- }
-
- status, err := p.Delete(ctx)
- if err != nil {
- return 255, time.Now(), nil
- }
- return status.ExitCode(), status.ExitTime(), nil
+func (p process) Delete(ctx context.Context) (*containerd.ExitStatus, error) {
+ s, err := p.Process.Delete(ctx)
+ return s, wrapError(err)
}
-func (c *client) Delete(ctx context.Context, containerID string) error {
- ctr, err := c.getContainer(ctx, containerID)
- if err != nil {
- return err
- }
- labels, err := ctr.Labels(ctx)
+func (c *container) Delete(ctx context.Context) error {
+ // Optimization: assume the DockerContainerBundlePath label has not been
+ // updated since the container metadata was last loaded/refreshed.
+ md, err := c.c8dCtr.Info(ctx, containerd.WithoutRefreshedMetadata)
if err != nil {
return err
}
- bundle := labels[DockerContainerBundlePath]
- if err := ctr.Delete(ctx); err != nil {
+ bundle := md.Labels[DockerContainerBundlePath]
+ if err := c.c8dCtr.Delete(ctx); err != nil {
return wrapError(err)
}
- c.oomMu.Lock()
- delete(c.oom, containerID)
- c.oomMu.Unlock()
- c.v2runcoptionsMu.Lock()
- delete(c.v2runcoptions, containerID)
- c.v2runcoptionsMu.Unlock()
if os.Getenv("LIBCONTAINERD_NOCLEAN") != "1" {
if err := os.RemoveAll(bundle); err != nil {
- c.logger.WithError(err).WithFields(logrus.Fields{
- "container": containerID,
+ c.client.logger.WithContext(ctx).WithError(err).WithFields(logrus.Fields{
+ "container": c.c8dCtr.ID(),
"bundle": bundle,
}).Error("failed to remove state dir")
}
@@ -492,28 +387,25 @@ func (c *client) Delete(ctx context.Context, containerID string) error {
return nil
}
-func (c *client) Status(ctx context.Context, containerID string) (containerd.ProcessStatus, error) {
- t, err := c.getProcess(ctx, containerID, libcontainerdtypes.InitProcessName)
- if err != nil {
- return containerd.Unknown, err
- }
- s, err := t.Status(ctx)
- if err != nil {
- return containerd.Unknown, wrapError(err)
- }
- return s.Status, nil
+func (t *task) ForceDelete(ctx context.Context) error {
+ _, err := t.Task.Delete(ctx, containerd.WithProcessKill)
+ return wrapError(err)
+}
+
+func (t *task) Status(ctx context.Context) (containerd.Status, error) {
+ s, err := t.Task.Status(ctx)
+ return s, wrapError(err)
+}
+
+func (p process) Status(ctx context.Context) (containerd.Status, error) {
+ s, err := p.Process.Status(ctx)
+ return s, wrapError(err)
}
-func (c *client) getCheckpointOptions(id string, exit bool) containerd.CheckpointTaskOpts {
+func (c *container) getCheckpointOptions(exit bool) containerd.CheckpointTaskOpts {
return func(r *containerd.CheckpointTaskInfo) error {
- if r.Options == nil {
- c.v2runcoptionsMu.Lock()
- _, ok := c.v2runcoptions[id]
- c.v2runcoptionsMu.Unlock()
- if ok {
- r.Options = &v2runcoptions.CheckpointOptions{Exit: exit}
- }
- return nil
+ if r.Options == nil && c.v2runcoptions != nil {
+ r.Options = &v2runcoptions.CheckpointOptions{}
}
switch opts := r.Options.(type) {
@@ -525,27 +417,21 @@ func (c *client) getCheckpointOptions(id string, exit bool) containerd.Checkpoin
}
}
-func (c *client) CreateCheckpoint(ctx context.Context, containerID, checkpointDir string, exit bool) error {
- p, err := c.getProcess(ctx, containerID, libcontainerdtypes.InitProcessName)
- if err != nil {
- return err
- }
-
- opts := []containerd.CheckpointTaskOpts{c.getCheckpointOptions(containerID, exit)}
- img, err := p.(containerd.Task).Checkpoint(ctx, opts...)
+func (t *task) CreateCheckpoint(ctx context.Context, checkpointDir string, exit bool) error {
+ img, err := t.Task.Checkpoint(ctx, t.ctr.getCheckpointOptions(exit))
if err != nil {
return wrapError(err)
}
// Whatever happens, delete the checkpoint from containerd
defer func() {
- err := c.client.ImageService().Delete(context.Background(), img.Name())
+ err := t.ctr.client.client.ImageService().Delete(ctx, img.Name())
if err != nil {
- c.logger.WithError(err).WithField("digest", img.Target().Digest).
+ t.ctr.client.logger.WithError(err).WithField("digest", img.Target().Digest).
Warnf("failed to delete checkpoint image")
}
}()
- b, err := content.ReadBlob(ctx, c.client.ContentStore(), img.Target())
+ b, err := content.ReadBlob(ctx, t.ctr.client.client.ContentStore(), img.Target())
if err != nil {
return errdefs.System(errors.Wrapf(err, "failed to retrieve checkpoint data"))
}
@@ -566,7 +452,7 @@ func (c *client) CreateCheckpoint(ctx context.Context, containerID, checkpointDi
return errdefs.System(errors.Wrapf(err, "invalid checkpoint"))
}
- rat, err := c.client.ContentStore().ReaderAt(ctx, *cpDesc)
+ rat, err := t.ctr.client.client.ContentStore().ReaderAt(ctx, *cpDesc)
if err != nil {
return errdefs.System(errors.Wrapf(err, "failed to get checkpoint reader"))
}
@@ -579,7 +465,8 @@ func (c *client) CreateCheckpoint(ctx context.Context, containerID, checkpointDi
return err
}
-func (c *client) getContainer(ctx context.Context, id string) (containerd.Container, error) {
+// LoadContainer loads the containerd container.
+func (c *client) LoadContainer(ctx context.Context, id string) (libcontainerdtypes.Container, error) {
ctr, err := c.client.LoadContainer(ctx, id)
if err != nil {
if containerderrors.IsNotFound(err) {
@@ -587,42 +474,25 @@ func (c *client) getContainer(ctx context.Context, id string) (containerd.Contai
}
return nil, wrapError(err)
}
- return ctr, nil
+ return &container{client: c, c8dCtr: ctr}, nil
}
-func (c *client) getProcess(ctx context.Context, containerID, processID string) (containerd.Process, error) {
- ctr, err := c.getContainer(ctx, containerID)
- if err != nil {
- return nil, err
- }
- t, err := ctr.Task(ctx, nil)
+func (c *container) Task(ctx context.Context) (libcontainerdtypes.Task, error) {
+ t, err := c.c8dCtr.Task(ctx, nil)
if err != nil {
- if containerderrors.IsNotFound(err) {
- return nil, errors.WithStack(errdefs.NotFound(errors.New("container is not running")))
- }
- return nil, wrapError(err)
- }
- if processID == libcontainerdtypes.InitProcessName {
- return t, nil
- }
- p, err := t.LoadProcess(ctx, processID, nil)
- if err != nil {
- if containerderrors.IsNotFound(err) {
- return nil, errors.WithStack(errdefs.NotFound(errors.New("no such exec")))
- }
return nil, wrapError(err)
}
- return p, nil
+ return c.newTask(t), nil
}
// createIO creates the io to be used by a process
// This needs to get a pointer to interface as upon closure the process may not have yet been registered
-func (c *client) createIO(fifos *cio.FIFOSet, containerID, processID string, stdinCloseSync chan struct{}, attachStdio libcontainerdtypes.StdioCallback) (cio.IO, error) {
+func (c *container) createIO(fifos *cio.FIFOSet, processID string, stdinCloseSync chan containerd.Process, attachStdio libcontainerdtypes.StdioCallback) (cio.IO, error) {
var (
io *cio.DirectIO
err error
)
- io, err = c.newDirectIO(context.Background(), fifos)
+ io, err = c.client.newDirectIO(context.Background(), fifos)
if err != nil {
return nil, err
}
@@ -639,13 +509,13 @@ func (c *client) createIO(fifos *cio.FIFOSet, containerID, processID string, std
// Do the rest in a new routine to avoid a deadlock if the
// Exec/Start call failed.
go func() {
- <-stdinCloseSync
- p, err := c.getProcess(context.Background(), containerID, processID)
- if err == nil {
- err = p.CloseIO(context.Background(), containerd.WithStdinCloser)
- if err != nil && strings.Contains(err.Error(), "transport is closing") {
- err = nil
- }
+ p, ok := <-stdinCloseSync
+ if !ok {
+ return
+ }
+ err = p.CloseIO(context.Background(), containerd.WithStdinCloser)
+ if err != nil && strings.Contains(err.Error(), "transport is closing") {
+ err = nil
}
}()
})
@@ -665,51 +535,12 @@ func (c *client) processEvent(ctx context.Context, et libcontainerdtypes.EventTy
c.eventQ.Append(ei.ContainerID, func() {
err := c.backend.ProcessEvent(ei.ContainerID, et, ei)
if err != nil {
- c.logger.WithError(err).WithFields(logrus.Fields{
+ c.logger.WithContext(ctx).WithError(err).WithFields(logrus.Fields{
"container": ei.ContainerID,
"event": et,
"event-info": ei,
}).Error("failed to process event")
}
-
- if et == libcontainerdtypes.EventExit && ei.ProcessID != ei.ContainerID {
- p, err := c.getProcess(ctx, ei.ContainerID, ei.ProcessID)
- if err != nil {
-
- c.logger.WithError(errors.New("no such process")).
- WithFields(logrus.Fields{
- "error": err,
- "container": ei.ContainerID,
- "process": ei.ProcessID,
- }).Error("exit event")
- return
- }
-
- ctr, err := c.getContainer(ctx, ei.ContainerID)
- if err != nil {
- c.logger.WithFields(logrus.Fields{
- "container": ei.ContainerID,
- "error": err,
- }).Error("failed to find container")
- } else {
- labels, err := ctr.Labels(ctx)
- if err != nil {
- c.logger.WithFields(logrus.Fields{
- "container": ei.ContainerID,
- "error": err,
- }).Error("failed to get container labels")
- return
- }
- newFIFOSet(labels[DockerContainerBundlePath], ei.ProcessID, true, false).Close()
- }
- _, err = p.Delete(context.Background())
- if err != nil {
- c.logger.WithError(err).WithFields(logrus.Fields{
- "container": ei.ContainerID,
- "process": ei.ProcessID,
- }).Warn("failed to delete process")
- }
- }
})
}
@@ -767,7 +598,6 @@ func (c *client) processEventStream(ctx context.Context, ns string) {
c.logger.Debug("processing event stream")
for {
- var oomKilled bool
select {
case err = <-errC:
if err != nil {
@@ -825,9 +655,7 @@ func (c *client) processEventStream(ctx context.Context, ns string) {
et = libcontainerdtypes.EventOOM
ei = libcontainerdtypes.EventInfo{
ContainerID: t.ContainerID,
- OOMKilled: true,
}
- oomKilled = true
case *apievents.TaskExecAdded:
et = libcontainerdtypes.EventExecAdded
ei = libcontainerdtypes.EventInfo{
@@ -866,13 +694,6 @@ func (c *client) processEventStream(ctx context.Context, ns string) {
continue
}
- c.oomMu.Lock()
- if oomKilled {
- c.oom[ei.ContainerID] = true
- }
- ei.OOMKilled = c.oom[ei.ContainerID]
- c.oomMu.Unlock()
-
c.processEvent(ctx, et, ei)
}
}
diff --git a/libcontainerd/remote/client_linux.go b/libcontainerd/remote/client_linux.go
index e45d140b2f..dd7aee8fe8 100644
--- a/libcontainerd/remote/client_linux.go
+++ b/libcontainerd/remote/client_linux.go
@@ -20,15 +20,10 @@ func summaryFromInterface(i interface{}) (*libcontainerdtypes.Summary, error) {
return &libcontainerdtypes.Summary{}, nil
}
-func (c *client) UpdateResources(ctx context.Context, containerID string, resources *libcontainerdtypes.Resources) error {
- p, err := c.getProcess(ctx, containerID, libcontainerdtypes.InitProcessName)
- if err != nil {
- return err
- }
-
+func (t *task) UpdateResources(ctx context.Context, resources *libcontainerdtypes.Resources) error {
// go doesn't like the alias in 1.8, this means this need to be
// platform specific
- return p.(containerd.Task).Update(ctx, containerd.WithResources((*specs.LinuxResources)(resources)))
+ return t.Update(ctx, containerd.WithResources((*specs.LinuxResources)(resources)))
}
func hostIDFromMap(id uint32, mp []specs.LinuxIDMapping) int {
diff --git a/libcontainerd/remote/client_windows.go b/libcontainerd/remote/client_windows.go
index 87fb0e119d..4591124430 100644
--- a/libcontainerd/remote/client_windows.go
+++ b/libcontainerd/remote/client_windows.go
@@ -87,7 +87,7 @@ func (c *client) newDirectIO(ctx context.Context, fifos *cio.FIFOSet) (*cio.Dire
return cio.NewDirectIOFromFIFOSet(ctx, pipes.stdin, pipes.stdout, pipes.stderr, fifos), nil
}
-func (c *client) UpdateResources(ctx context.Context, containerID string, resources *libcontainerdtypes.Resources) error {
+func (t *task) UpdateResources(ctx context.Context, resources *libcontainerdtypes.Resources) error {
// TODO: (containerd): Not implemented, but don't error.
return nil
}
diff --git a/libcontainerd/replace.go b/libcontainerd/replace.go
new file mode 100644
index 0000000000..6ef6141e98
--- /dev/null
+++ b/libcontainerd/replace.go
@@ -0,0 +1,62 @@
+package libcontainerd // import "github.com/docker/docker/libcontainerd"
+
+import (
+ "context"
+
+ "github.com/containerd/containerd"
+ "github.com/opencontainers/runtime-spec/specs-go"
+ "github.com/pkg/errors"
+ "github.com/sirupsen/logrus"
+
+ "github.com/docker/docker/errdefs"
+ "github.com/docker/docker/libcontainerd/types"
+)
+
+// ReplaceContainer creates a new container, replacing any existing container
+// with the same id if necessary.
+func ReplaceContainer(ctx context.Context, client types.Client, id string, spec *specs.Spec, shim string, runtimeOptions interface{}, opts ...containerd.NewContainerOpts) (types.Container, error) {
+ newContainer := func() (types.Container, error) {
+ return client.NewContainer(ctx, id, spec, shim, runtimeOptions, opts...)
+ }
+ ctr, err := newContainer()
+ if err == nil || !errdefs.IsConflict(err) {
+ return ctr, err
+ }
+
+ log := logrus.WithContext(ctx).WithField("container", id)
+ log.Debug("A container already exists with the same ID. Attempting to clean up the old container.")
+ ctr, err = client.LoadContainer(ctx, id)
+ if err != nil {
+ if errdefs.IsNotFound(err) {
+ // Task failed successfully: the container no longer exists,
+ // despite us not doing anything. May as well try to create
+ // the container again. It might succeed.
+ return newContainer()
+ }
+ return nil, errors.Wrap(err, "could not load stale containerd container object")
+ }
+ tsk, err := ctr.Task(ctx)
+ if err != nil {
+ if errdefs.IsNotFound(err) {
+ goto deleteContainer
+ }
+ // There is no point in trying to delete the container if we
+ // cannot determine whether or not it has a task. The containerd
+ // client would just try to load the task itself, get the same
+ // error, and give up.
+ return nil, errors.Wrap(err, "could not load stale containerd task object")
+ }
+ if err := tsk.ForceDelete(ctx); err != nil {
+ if !errdefs.IsNotFound(err) {
+ return nil, errors.Wrap(err, "could not delete stale containerd task object")
+ }
+ // The task might have exited on its own. Proceed with
+ // attempting to delete the container.
+ }
+deleteContainer:
+ if err := ctr.Delete(ctx); err != nil && !errdefs.IsNotFound(err) {
+ return nil, errors.Wrap(err, "could not delete stale containerd container object")
+ }
+
+ return newContainer()
+}
diff --git a/libcontainerd/types/types.go b/libcontainerd/types/types.go
index 71082f7661..673b184c03 100644
--- a/libcontainerd/types/types.go
+++ b/libcontainerd/types/types.go
@@ -33,7 +33,6 @@ type EventInfo struct {
Pid uint32
ExitCode uint32
ExitedAt time.Time
- OOMKilled bool
Error error
}
@@ -44,32 +43,58 @@ type Backend interface {
// Process of a container
type Process interface {
- Delete(context.Context) (uint32, time.Time, error)
+ // Pid is the system specific process id
+ Pid() uint32
+ // Kill sends the provided signal to the process
+ Kill(ctx context.Context, signal syscall.Signal) error
+ // Resize changes the width and height of the process's terminal
+ Resize(ctx context.Context, width, height uint32) error
+ // Delete removes the process and any resources allocated returning the exit status
+ Delete(context.Context) (*containerd.ExitStatus, error)
}
// Client provides access to containerd features.
type Client interface {
Version(ctx context.Context) (containerd.Version, error)
+ // LoadContainer loads the metadata for a container from containerd.
+ LoadContainer(ctx context.Context, containerID string) (Container, error)
+ // NewContainer creates a new containerd container.
+ NewContainer(ctx context.Context, containerID string, spec *specs.Spec, shim string, runtimeOptions interface{}, opts ...containerd.NewContainerOpts) (Container, error)
+}
- Restore(ctx context.Context, containerID string, attachStdio StdioCallback) (alive bool, pid int, p Process, err error)
-
- Create(ctx context.Context, containerID string, spec *specs.Spec, shim string, runtimeOptions interface{}, opts ...containerd.NewContainerOpts) error
- Start(ctx context.Context, containerID, checkpointDir string, withStdin bool, attachStdio StdioCallback) (pid int, err error)
- SignalProcess(ctx context.Context, containerID, processID string, signal syscall.Signal) error
- Exec(ctx context.Context, containerID, processID string, spec *specs.Process, withStdin bool, attachStdio StdioCallback) (int, error)
- ResizeTerminal(ctx context.Context, containerID, processID string, width, height int) error
- CloseStdin(ctx context.Context, containerID, processID string) error
- Pause(ctx context.Context, containerID string) error
- Resume(ctx context.Context, containerID string) error
- Stats(ctx context.Context, containerID string) (*Stats, error)
- ListPids(ctx context.Context, containerID string) ([]uint32, error)
- Summary(ctx context.Context, containerID string) ([]Summary, error)
- DeleteTask(ctx context.Context, containerID string) (uint32, time.Time, error)
- Delete(ctx context.Context, containerID string) error
- Status(ctx context.Context, containerID string) (containerd.ProcessStatus, error)
+// Container provides access to a containerd container.
+type Container interface {
+ Start(ctx context.Context, checkpointDir string, withStdin bool, attachStdio StdioCallback) (Task, error)
+ Task(ctx context.Context) (Task, error)
+ // AttachTask returns the current task for the container and reattaches
+ // to the IO for the running task. If no task exists for the container
+ // a NotFound error is returned.
+ //
+ // Clients must make sure that only one reader is attached to the task.
+ AttachTask(ctx context.Context, attachStdio StdioCallback) (Task, error)
+ // Delete removes the container and associated resources
+ Delete(context.Context) error
+}
- UpdateResources(ctx context.Context, containerID string, resources *Resources) error
- CreateCheckpoint(ctx context.Context, containerID, checkpointDir string, exit bool) error
+// Task provides access to a running containerd container.
+type Task interface {
+ Process
+ // Pause suspends the execution of the task
+ Pause(context.Context) error
+ // Resume the execution of the task
+ Resume(context.Context) error
+ Stats(ctx context.Context) (*Stats, error)
+ // Pids returns a list of system specific process ids inside the task
+ Pids(context.Context) ([]containerd.ProcessInfo, error)
+ Summary(ctx context.Context) ([]Summary, error)
+ // ForceDelete forcefully kills the task's processes and deletes the task
+ ForceDelete(context.Context) error
+ // Status returns the executing status of the task
+ Status(ctx context.Context) (containerd.Status, error)
+ // Exec creates and starts a new process inside the task
+ Exec(ctx context.Context, processID string, spec *specs.Process, withStdin bool, attachStdio StdioCallback) (Process, error)
+ UpdateResources(ctx context.Context, resources *Resources) error
+ CreateCheckpoint(ctx context.Context, checkpointDir string, exit bool) error
}
// StdioCallback is called to connect a container or process stdio.
diff --git a/plugin/executor/containerd/containerd.go b/plugin/executor/containerd/containerd.go
index 92983056d0..0327e65dc4 100644
--- a/plugin/executor/containerd/containerd.go
+++ b/plugin/executor/containerd/containerd.go
@@ -2,6 +2,7 @@ package containerd // import "github.com/docker/docker/plugin/executor/container
import (
"context"
+ "fmt"
"io"
"sync"
"syscall"
@@ -28,6 +29,7 @@ func New(ctx context.Context, rootDir string, cli *containerd.Client, ns string,
rootDir: rootDir,
exitHandler: exitHandler,
runtime: runtime,
+ plugins: make(map[string]*c8dPlugin),
}
client, err := libcontainerd.NewClient(ctx, cli, rootDir, ns, e)
@@ -44,77 +46,112 @@ type Executor struct {
client libcontainerdtypes.Client
exitHandler ExitHandler
runtime types.Runtime
+
+ mu sync.Mutex // Guards plugins map
+ plugins map[string]*c8dPlugin
+}
+
+type c8dPlugin struct {
+ log *logrus.Entry
+ ctr libcontainerdtypes.Container
+ tsk libcontainerdtypes.Task
}
// deleteTaskAndContainer deletes plugin task and then plugin container from containerd
-func deleteTaskAndContainer(ctx context.Context, cli libcontainerdtypes.Client, id string, p libcontainerdtypes.Process) {
- if p != nil {
- if _, _, err := p.Delete(ctx); err != nil && !errdefs.IsNotFound(err) {
- logrus.WithError(err).WithField("id", id).Error("failed to delete plugin task from containerd")
- }
- } else {
- if _, _, err := cli.DeleteTask(ctx, id); err != nil && !errdefs.IsNotFound(err) {
- logrus.WithError(err).WithField("id", id).Error("failed to delete plugin task from containerd")
+func (p c8dPlugin) deleteTaskAndContainer(ctx context.Context) {
+ if p.tsk != nil {
+ if _, err := p.tsk.Delete(ctx); err != nil && !errdefs.IsNotFound(err) {
+ p.log.WithError(err).Error("failed to delete plugin task from containerd")
}
}
-
- if err := cli.Delete(ctx, id); err != nil && !errdefs.IsNotFound(err) {
- logrus.WithError(err).WithField("id", id).Error("failed to delete plugin container from containerd")
+ if p.ctr != nil {
+ if err := p.ctr.Delete(ctx); err != nil && !errdefs.IsNotFound(err) {
+ p.log.WithError(err).Error("failed to delete plugin container from containerd")
+ }
}
}
// Create creates a new container
func (e *Executor) Create(id string, spec specs.Spec, stdout, stderr io.WriteCloser) error {
ctx := context.Background()
- err := e.client.Create(ctx, id, &spec, e.runtime.Shim.Binary, e.runtime.Shim.Opts)
+ log := logrus.WithField("plugin", id)
+ ctr, err := libcontainerd.ReplaceContainer(ctx, e.client, id, &spec, e.runtime.Shim.Binary, e.runtime.Shim.Opts)
if err != nil {
- status, err2 := e.client.Status(ctx, id)
- if err2 != nil {
- if !errdefs.IsNotFound(err2) {
- logrus.WithError(err2).WithField("id", id).Warn("Received an error while attempting to read plugin status")
- }
- } else {
- if status != containerd.Running && status != containerd.Unknown {
- if err2 := e.client.Delete(ctx, id); err2 != nil && !errdefs.IsNotFound(err2) {
- logrus.WithError(err2).WithField("plugin", id).Error("Error cleaning up containerd container")
- }
- err = e.client.Create(ctx, id, &spec, e.runtime.Shim.Binary, e.runtime.Shim.Opts)
- }
- }
-
- if err != nil {
- return errors.Wrap(err, "error creating containerd container")
- }
+ return errors.Wrap(err, "error creating containerd container for plugin")
}
- _, err = e.client.Start(ctx, id, "", false, attachStreamsFunc(stdout, stderr))
+ p := c8dPlugin{log: log, ctr: ctr}
+ p.tsk, err = ctr.Start(ctx, "", false, attachStreamsFunc(stdout, stderr))
if err != nil {
- deleteTaskAndContainer(ctx, e.client, id, nil)
+ p.deleteTaskAndContainer(ctx)
+ return err
}
- return err
+ e.mu.Lock()
+ defer e.mu.Unlock()
+ e.plugins[id] = &p
+ return nil
}
// Restore restores a container
func (e *Executor) Restore(id string, stdout, stderr io.WriteCloser) (bool, error) {
- alive, _, p, err := e.client.Restore(context.Background(), id, attachStreamsFunc(stdout, stderr))
- if err != nil && !errdefs.IsNotFound(err) {
+ ctx := context.Background()
+ p := c8dPlugin{log: logrus.WithField("plugin", id)}
+ ctr, err := e.client.LoadContainer(ctx, id)
+ if err != nil {
+ if errdefs.IsNotFound(err) {
+ return false, nil
+ }
return false, err
}
- if !alive {
- deleteTaskAndContainer(context.Background(), e.client, id, p)
+ p.tsk, err = ctr.AttachTask(ctx, attachStreamsFunc(stdout, stderr))
+ if err != nil {
+ if errdefs.IsNotFound(err) {
+ p.deleteTaskAndContainer(ctx)
+ return false, nil
+ }
+ return false, err
}
- return alive, nil
+ s, err := p.tsk.Status(ctx)
+ if err != nil {
+ if errdefs.IsNotFound(err) {
+ // Task vanished after attaching?
+ p.tsk = nil
+ p.deleteTaskAndContainer(ctx)
+ return false, nil
+ }
+ return false, err
+ }
+ if s.Status == containerd.Stopped {
+ p.deleteTaskAndContainer(ctx)
+ return false, nil
+ }
+ e.mu.Lock()
+ defer e.mu.Unlock()
+ e.plugins[id] = &p
+ return true, nil
}
// IsRunning returns if the container with the given id is running
func (e *Executor) IsRunning(id string) (bool, error) {
- status, err := e.client.Status(context.Background(), id)
- return status == containerd.Running, err
+ e.mu.Lock()
+ p := e.plugins[id]
+ e.mu.Unlock()
+ if p == nil {
+ return false, errdefs.NotFound(fmt.Errorf("unknown plugin %q", id))
+ }
+ status, err := p.tsk.Status(context.Background())
+ return status.Status == containerd.Running, err
}
// Signal sends the specified signal to the container
func (e *Executor) Signal(id string, signal syscall.Signal) error {
- return e.client.SignalProcess(context.Background(), id, libcontainerdtypes.InitProcessName, signal)
+ e.mu.Lock()
+ p := e.plugins[id]
+ e.mu.Unlock()
+ if p == nil {
+ return errdefs.NotFound(fmt.Errorf("unknown plugin %q", id))
+ }
+ return p.tsk.Kill(context.Background(), signal)
}
// ProcessEvent handles events from containerd
@@ -122,7 +159,14 @@ func (e *Executor) Signal(id string, signal syscall.Signal) error {
func (e *Executor) ProcessEvent(id string, et libcontainerdtypes.EventType, ei libcontainerdtypes.EventInfo) error {
switch et {
case libcontainerdtypes.EventExit:
- deleteTaskAndContainer(context.Background(), e.client, id, nil)
+ e.mu.Lock()
+ p := e.plugins[id]
+ e.mu.Unlock()
+ if p == nil {
+ logrus.WithField("id", id).Warn("Received exit event for an unknown plugin")
+ } else {
+ p.deleteTaskAndContainer(context.Background())
+ }
return e.exitHandler.HandleExitEvent(ei.ContainerID)
}
return nil