Public release of katbox

This commit is contained in:
Renán Del Valle 2021-04-12 12:00:01 -07:00
commit 4c764e09a4
46 changed files with 4646 additions and 0 deletions

58
pkg/katbox/README.md Normal file
View file

@ -0,0 +1,58 @@
# CSI Hostpath driver
## Usage:
### Build hostpathplugin
```
$ make hostpath
```
### Start Hostpath driver
```
$ sudo ./_output/hostpathplugin --endpoint tcp://127.0.0.1:10000 --nodeid CSINode -v=5
```
### Test using csc
Get ```csc``` tool from https://github.com/rexray/gocsi/tree/master/csc
#### Get plugin info
```
$ csc identity plugin-info --endpoint tcp://127.0.0.1:10000
"csi-hostpath" "0.1.0"
```
#### Create a volume
```
$ csc controller new --endpoint tcp://127.0.0.1:10000 --cap 1,block CSIVolumeName
CSIVolumeID
```
#### Delete a volume
```
$ csc controller del --endpoint tcp://127.0.0.1:10000 CSIVolumeID
CSIVolumeID
```
#### Validate volume capabilities
```
$ csc controller validate-volume-capabilities --endpoint tcp://127.0.0.1:10000 --cap 1,block CSIVolumeID
CSIVolumeID true
```
#### NodePublish a volume
```
$ csc node publish --endpoint tcp://127.0.0.1:10000 --cap 1,block --target-path /mnt/hostpath CSIVolumeID
CSIVolumeID
```
#### NodeUnpublish a volume
```
$ csc node unpublish --endpoint tcp://127.0.0.1:10000 --target-path /mnt/hostpath CSIVolumeID
CSIVolumeID
```
#### Get NodeID
```
$ csc node get-id --endpoint tcp://127.0.0.1:10000
CSINode
```

191
pkg/katbox/gc.go Normal file
View file

@ -0,0 +1,191 @@
/*
Copyright 2020 PayPal.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package katbox
import (
"encoding/json"
"fmt"
"os"
"path/filepath"
"sync"
"time"
"github.com/ricochet2200/go-disk-usage/du"
bolt "go.etcd.io/bbolt"
"github.com/golang/glog"
)
type deletedVolumes struct {
candidates map[string]*deletionCandidate
storage *bolt.DB
lock sync.RWMutex
}
type deletionCandidate struct {
Time time.Time `json:"deleteTime"`
Lifespan time.Duration `json:"lifespan"`
Path string `json:"path"`
}
func (d *deletedVolumes) periodicCleanup(
done <-chan struct{},
interval time.Duration,
wg *sync.WaitGroup,
headroom float64,
workdir string,
) {
for {
select {
case <-done:
if err := d.storage.Close(); err != nil {
glog.Info("unable to close persistent storage ", err)
}
wg.Done()
return
default:
d.prune(workdir, headroom)
time.Sleep(interval)
}
}
}
func (d *deletedVolumes) queue(id string, vol deletionCandidate) {
// Check if an entry for deletion already exists
d.lock.RLock()
_, found := d.candidates[id]
d.lock.RUnlock()
if found {
return
}
// Write ahead persist to local storage the volume that will be entering our deletion queue
err := d.storage.Update(func(tx *bolt.Tx) error {
bucket := tx.Bucket([]byte(deletedVolumesBucketName))
if bucket == nil {
return fmt.Errorf("bucket %s does not exist", deletedVolumesBucketName)
}
marshaledVol, err := json.Marshal(vol)
if err != nil {
return fmt.Errorf("unable to serialize deletion candidate: %w", err)
}
err = bucket.Put([]byte(id), marshaledVol)
if err != nil {
return fmt.Errorf("unable to insert volume into database: %w", err)
}
return nil
})
if err != nil {
glog.Infof("failed to persist "+id+" at "+vol.Path, ": ", err)
return
}
d.lock.Lock()
defer d.lock.Unlock()
d.candidates[id] = &vol
}
func (d *deletedVolumes) remove(id string) {
err := d.storage.Update(func(tx *bolt.Tx) error {
bucket := tx.Bucket([]byte(deletedVolumesBucketName))
err := bucket.Delete([]byte(id))
if err != nil {
return fmt.Errorf("unable to delete %s from permanent storage: %s", id, err)
}
return nil
})
if err != nil {
glog.Infof("failed to remove "+id+": ", err)
return
}
d.lock.Lock()
defer d.lock.Unlock()
delete(d.candidates, id)
}
func (d *deletedVolumes) prune(workdir string, headroom float64) {
glog.V(2).Info("number of volumes queued for deletion: ", len(d.candidates))
// Only get the time once since this results in a syscall
// This may mean that some volumes may need to wait until next cycle to be pruned
currentTime := time.Now()
// Determine pressure factor based on underlying storage utilization
diskUsage := du.NewDiskUsage(workdir)
pressureFactor, err := pressureFactor(diskUsage.Size(), diskUsage.Free(), headroom)
if err != nil {
glog.Info("error calculating pressure factor, setting pressure factor to default value of 0.10 ", err)
pressureFactor = 0.1
}
glog.Info("disk pressure factor being used for this prune round: ", pressureFactor)
// Create a deep copy of the maps for safe reading
candidatesCopy := make(map[string]*deletionCandidate)
d.lock.RLock()
for id, vol := range d.candidates {
candidatesCopy[id] = vol
}
d.lock.RUnlock()
// Iterate over the copy of the candidates list since iterating over the original
// provides no concurrency safety and attempting to use locks leads to a deadlock in many code paths.
for id, vol := range candidatesCopy {
if vol == nil {
continue
}
if glog.V(5) {
glog.Infof("Deletion candidate volume ID: %v\n%+v", id, vol)
}
// Short circuit if the path doesn't exist
if _, err := os.Stat(vol.Path); os.IsNotExist(err) {
glog.Infof("removing %v from queue as path %v does not exist", id, vol.Path)
d.remove(id)
}
// Check to see if the current has passed the time when we need to evict this volume from
// the underlying storage. The point in time is a combination of the pressure factor
// and the configured afterlife duration.
if currentTime.After(vol.Time.Add(time.Duration(float64(vol.Lifespan) * pressureFactor))) {
err := os.RemoveAll(vol.Path)
if err != nil {
glog.Infof("unable to delete "+id+" at "+vol.Path, ": ", err)
continue
}
// Attempt to remove PodUUID directory if empty.
// We ignore the error here because this will correctly fail when a pod with multiple katbox volumes
// attempts to delete the parent directory. Only the last remaining volume being deleted should succeed.
_ = os.Remove(filepath.Dir(vol.Path))
glog.Infof("deleted " + id + " at " + vol.Path)
d.remove(id)
}
}
}

48
pkg/katbox/gc_test.go Normal file
View file

@ -0,0 +1,48 @@
/*
Copyright 2020 PayPal.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package katbox
import (
"sync"
"testing"
"time"
)
func TestDelete(t *testing.T) {
deleteQueue := deletedVolumes{candidates: make(map[string]*deletionCandidate), lock: sync.RWMutex{}}
deleteQueue.queue("volume1", deletionCandidate{
Time: time.Now(),
Lifespan: time.Second * 5,
Path: "/doesnt/exist",
})
deleteQueue.queue("vol2", deletionCandidate{
Time: time.Now(),
Lifespan: time.Second * 1,
Path: "/doesnt/exist2",
})
deleteQueue.queue("vol3", deletionCandidate{
Time: time.Now(),
Lifespan: time.Second * 3,
Path: "/doesnt/exist3",
})
for len(deleteQueue.candidates) > 0 {
deleteQueue.prune()
time.Sleep(time.Second * 1)
}
}

View file

@ -0,0 +1,80 @@
/*
Copyright 2017 The Kubernetes Authors.
Modifications copyright 2020 PayPal.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package katbox
import (
"github.com/container-storage-interface/spec/lib/go/csi"
"github.com/golang/glog"
"golang.org/x/net/context"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"
)
type identityServer struct {
name string
version string
}
func NewIdentityServer(name, version string) *identityServer {
return &identityServer{
name: name,
version: version,
}
}
func (ids *identityServer) GetPluginInfo(ctx context.Context, req *csi.GetPluginInfoRequest) (*csi.GetPluginInfoResponse, error) {
glog.V(5).Infof("Using default GetPluginInfo")
if ids.name == "" {
return nil, status.Error(codes.Unavailable, "Driver name not configured")
}
if ids.version == "" {
return nil, status.Error(codes.Unavailable, "Driver is missing version")
}
return &csi.GetPluginInfoResponse{
Name: ids.name,
VendorVersion: ids.version,
}, nil
}
func (ids *identityServer) Probe(ctx context.Context, req *csi.ProbeRequest) (*csi.ProbeResponse, error) {
return &csi.ProbeResponse{}, nil
}
func (ids *identityServer) GetPluginCapabilities(ctx context.Context, req *csi.GetPluginCapabilitiesRequest) (*csi.GetPluginCapabilitiesResponse, error) {
glog.V(5).Infof("Using default capabilities")
return &csi.GetPluginCapabilitiesResponse{
Capabilities: []*csi.PluginCapability{
{
Type: &csi.PluginCapability_Service_{
Service: &csi.PluginCapability_Service{},
},
},
{
Type: &csi.PluginCapability_Service_{
Service: &csi.PluginCapability_Service{
Type: csi.PluginCapability_Service_VOLUME_ACCESSIBILITY_CONSTRAINTS,
},
},
},
},
}, nil
}

118
pkg/katbox/katbox.go Normal file
View file

@ -0,0 +1,118 @@
/*
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package katbox
import (
"errors"
"fmt"
"os"
"sync"
"time"
"github.com/golang/glog"
)
type katbox struct {
name string
nodeID string
version string
endpoint string
pruneInterval time.Duration
headroom float64
idServer *identityServer
nodeServer *nodeServer
}
type volume struct {
Name string `json:"name"`
ID string `json:"id"`
PodUUID string `json:"podUUID"`
Size int64 `json:"size"`
Path string `json:"path"`
AccessType accessType `json:"accessType"`
ParentVolID string `json:"parentVolID,omitempty"`
Ephemeral bool `json:"ephemeral"`
}
var (
vendorVersion = "dev"
)
func NewKatboxDriver(
driverName, nodeID, endpoint, workdir string,
maxVolumesPerNode int64,
afterlifeSpan, deleteInterval time.Duration,
headroom float64,
version string) (*katbox, error) {
if driverName == "" {
return nil, errors.New("no driver name provided")
}
if nodeID == "" {
return nil, errors.New("no node id provided")
}
if endpoint == "" {
return nil, errors.New("no driver endpoint provided")
}
if version != "" {
vendorVersion = version
}
if err := os.MkdirAll(workdir, 0750); err != nil {
return nil, fmt.Errorf("failed to create working directory: %v", err)
}
glog.Infof("Driver: %v ", driverName)
glog.Infof("Version: %s", vendorVersion)
return &katbox{
name: driverName,
version: vendorVersion,
nodeID: nodeID,
endpoint: endpoint,
pruneInterval: deleteInterval,
headroom: headroom,
idServer: NewIdentityServer(driverName, version),
nodeServer: &nodeServer{node: NewNode(nodeID, workdir, maxVolumesPerNode, afterlifeSpan)},
}, nil
}
func (k *katbox) Run() {
if k.idServer == nil || k.nodeServer == nil || k.nodeServer.node == nil {
glog.V(1).Infof("unable to create server")
return
}
// Create GRPC servers
s := NewNonBlockingGRPCServer()
s.Start(k.endpoint, k.idServer, k.nodeServer)
// Start pruner as a go routine
endPrune := make(chan struct{})
wg := sync.WaitGroup{}
wg.Add(1)
go k.nodeServer.node.deletedVolumes.periodicCleanup(endPrune, k.pruneInterval, &wg, k.headroom, k.nodeServer.node.workdir)
// Wait for identity and node server to shut down
s.Wait()
// Signal to the pruner that it should clean up upon ending next loop
close(endPrune)
// Wait for pruner to signal that has finished cleaning up
wg.Wait()
}

230
pkg/katbox/node.go Normal file
View file

@ -0,0 +1,230 @@
/*
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package katbox
import (
"encoding/json"
"errors"
"fmt"
"os"
"path"
"sync"
"time"
"github.com/golang/glog"
bolt "go.etcd.io/bbolt"
"k8s.io/kubernetes/pkg/volume/util/volumepathhandler"
utilexec "k8s.io/utils/exec"
)
type node struct {
id string
volumes map[string]volume
deletedVolumes deletedVolumes
workdir string
afterLifespan time.Duration
maxVolumes int64
storage *bolt.DB
}
func NewNode(id, workdir string, maxVolumes int64, afterLifespan time.Duration) *node {
db, err := initializePermanentStorage(
path.Join(workdir, "deletedVolumes.db"),
deletedVolumesBucketName,
volumesBucketName)
if err != nil {
return nil
}
candidates, err := loadDeletedVolumesFromPersistent(db, deletedVolumesBucketName)
if err != nil {
return nil
}
volumes, err := loadVolumesFromPersistent(db, volumesBucketName)
if err != nil {
return nil
}
glog.V(4).Infof("loaded %d volume records into memory", len(volumes))
return &node{
id: id,
volumes: volumes,
deletedVolumes: deletedVolumes{
candidates: candidates,
lock: sync.RWMutex{},
storage: db,
},
workdir: workdir,
afterLifespan: afterLifespan,
maxVolumes: maxVolumes,
storage: db,
}
}
func initializePermanentStorage(dbFilename string, bucketNames ...string) (*bolt.DB, error) {
db, err := bolt.Open(dbFilename, 0600, &bolt.Options{Timeout: 10 * time.Second})
if err != nil {
glog.V(4).Infof("unable to open persistent storage: %s", err)
return nil, err
}
err = db.Update(func(tx *bolt.Tx) error {
for _, name := range bucketNames {
_, err := tx.CreateBucketIfNotExists([]byte(name))
if err != nil {
return fmt.Errorf("unable to create bucket: %w", err)
}
}
return nil
})
if err != nil {
glog.V(4).Infof("unable to create bucket for storage: %s", err)
return nil, err
}
return db, nil
}
func loadDeletedVolumesFromPersistent(db *bolt.DB, bucketName string) (map[string]*deletionCandidate, error) {
if db == nil {
return nil, errors.New("database has not been initialized")
}
// Load list of volumes to be deleted from the persistent layer
candidates := make(map[string]*deletionCandidate)
err := db.View(func(tx *bolt.Tx) error {
bucket := tx.Bucket([]byte(bucketName))
if bucket == nil {
return fmt.Errorf("bucket %s doesn't exist", bucketName)
}
c := bucket.Cursor()
for k, v := c.First(); k != nil; k, v = c.Next() {
var candidate deletionCandidate
id := string(k)
if err := json.Unmarshal(v, &candidate); err != nil {
glog.V(1).Infof("unable to load volume %s: %s", id, err)
} else {
glog.V(4).Infof("loaded volume %s info into memory", id)
}
candidates[id] = &candidate
}
return nil
})
if err != nil {
glog.Info("unable load persistent layer into memory ", err)
return nil, err
}
return candidates, nil
}
func loadVolumesFromPersistent(db *bolt.DB, bucketName string) (map[string]volume, error) {
if db == nil {
return nil, errors.New("database has not been initialized")
}
// Load list of volumes to be deleted from the persistent layer
volumes := make(map[string]volume)
err := db.View(func(tx *bolt.Tx) error {
bucket := tx.Bucket([]byte(bucketName))
if bucket == nil {
return fmt.Errorf("bucket %s doesn't exist", bucketName)
}
c := bucket.Cursor()
for k, v := c.First(); k != nil; k, v = c.Next() {
var vol volume
id := string(k)
if err := json.Unmarshal(v, &vol); err != nil {
glog.V(1).Infof("unable to load volume %s: %s", id, err)
} else {
glog.V(4).Infof("loaded volume %s info into memory", id)
}
volumes[id] = vol
}
return nil
})
if err != nil {
glog.Info("unable load persistent layer into memory: ", err)
return nil, err
}
return volumes, nil
}
// createEphemeralVolume create the directory for the katbox volume.
// It returns the volume path or err if one occurs.
func (n *node) createEphemeralVolume(volID, podUUID, name string, cap int64, volAccessType accessType) (*volume, error) {
fullPath := fullpath(n.workdir, podUUID, volID)
switch volAccessType {
case mountAccess:
err := os.MkdirAll(fullPath, 0777)
if err != nil {
return nil, err
}
case blockAccess:
executor := utilexec.New()
size := fmt.Sprintf("%dM", cap/mib)
// Create a block file.
out, err := executor.Command("fallocate", "-l", size, fullPath).CombinedOutput()
if err != nil {
return nil, fmt.Errorf("failed to create block device: %v, %v", err, string(out))
}
// Associate block file with the loop device.
volPathHandler := volumepathhandler.VolumePathHandler{}
_, err = volPathHandler.AttachFileDevice(fullPath)
if err != nil {
// Remove the block file because it'll no longer be used again.
if err2 := os.Remove(fullPath); err2 != nil {
glog.Errorf("failed to cleanup block file %s: %v", fullPath, err2)
}
return nil, fmt.Errorf("failed to attach device %v: %v", fullPath, err)
}
default:
return nil, fmt.Errorf("unsupported access type %v", volAccessType)
}
vol := volume{
Name: name,
ID: volID,
PodUUID: podUUID,
Size: cap,
Path: fullPath,
AccessType: volAccessType,
Ephemeral: true,
}
n.volumes[volID] = vol
return &vol, nil
}
func (n *node) volumeByID(id string) (volume, error) {
if vol, ok := n.volumes[id]; ok {
return vol, nil
}
return volume{}, fmt.Errorf("volume id %s does not exist in the volumes list", id)
}

358
pkg/katbox/nodeserver.go Normal file
View file

@ -0,0 +1,358 @@
/*
Copyright 2017 The Kubernetes Authors.
Modifications copyright 2020 PayPal.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package katbox
import (
"encoding/json"
"fmt"
"os"
"strings"
"time"
"github.com/container-storage-interface/spec/lib/go/csi"
"github.com/golang/glog"
"golang.org/x/net/context"
bolt "go.etcd.io/bbolt"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"
"k8s.io/kubernetes/pkg/volume/util/volumepathhandler"
"k8s.io/mount-utils"
)
const TopologyKeyNode = "topology.katbox.csi/node"
type nodeServer struct {
node *node
}
func (ns *nodeServer) NodePublishVolume(ctx context.Context, req *csi.NodePublishVolumeRequest) (*csi.NodePublishVolumeResponse, error) {
// Check arguments
if req.GetVolumeCapability() == nil {
return nil, status.Error(codes.InvalidArgument, "volume capability missing in request")
}
if len(req.GetVolumeId()) == 0 {
return nil, status.Error(codes.InvalidArgument, "volume ID missing in request")
}
if len(req.GetTargetPath()) == 0 {
return nil, status.Error(codes.InvalidArgument, "target path missing in request")
}
targetPath := req.GetTargetPath()
// Kubernetes 1.15 doesn't have csi.storage.k8s.io/ephemeral.
ephemeralVolume := req.GetVolumeContext()[ephemeralContext] == "true"
podUUID := req.GetVolumeContext()[podUUIDContext]
if !ephemeralVolume {
return nil, status.Error(codes.InvalidArgument, "this CSI driver only supports ephemeral volumes")
}
if req.GetVolumeCapability().GetBlock() != nil &&
req.GetVolumeCapability().GetMount() != nil {
return nil, status.Error(codes.InvalidArgument, "volume cannot be of both block and mount access type")
}
volID := req.GetVolumeId()
volName := fmt.Sprintf("ephemeral-%s", volID)
ephVol, err := ns.node.createEphemeralVolume(req.GetVolumeId(), podUUID, volName, maxStorageCapacity, mountAccess)
if err != nil && !os.IsExist(err) {
glog.Error("failed to create ephemeral volume: ", err)
return nil, status.Error(codes.Internal, err.Error())
}
glog.V(4).Infof("created ephemeral volume: %s", ephVol.Path)
vol, err := ns.node.volumeByID(req.GetVolumeId())
if err != nil {
return nil, status.Error(codes.NotFound, err.Error())
}
if req.GetVolumeCapability().GetBlock() != nil {
if vol.AccessType != blockAccess {
return nil, status.Error(codes.InvalidArgument, "cannot publish a non-block volume as block volume")
}
volPathHandler := volumepathhandler.VolumePathHandler{}
// Get loop device from the volume path.
loopDevice, err := volPathHandler.GetLoopDevice(vol.Path)
if err != nil {
return nil, status.Error(codes.Internal, fmt.Sprintf("failed to get the loop device: %v", err))
}
mounter := mount.New("")
// Check if the target path exists. Create if not present.
_, err = os.Lstat(targetPath)
if os.IsNotExist(err) {
if err = makeFile(targetPath); err != nil {
return nil, status.Error(codes.Internal, fmt.Sprintf("failed to create target path: %s: %v", targetPath, err))
}
}
if err != nil {
return nil, status.Errorf(codes.Internal, "failed to check if the target block file exists: %v", err)
}
// Check if the target path is already mounted. Prevent remounting.
notMount, err := mount.IsNotMountPoint(mounter, targetPath)
if err != nil {
if !os.IsNotExist(err) {
return nil, status.Errorf(codes.Internal, "error checking path %s for mount: %s", targetPath, err)
}
notMount = true
}
if !notMount {
// It's already mounted.
glog.V(5).Infof("Skipping bind-mounting subpath %s: already mounted", targetPath)
return &csi.NodePublishVolumeResponse{}, nil
}
if err := mount.New("").Mount(loopDevice, targetPath, "", []string{"bind"}); err != nil {
return nil, status.Error(codes.Internal, fmt.Sprintf("failed to mount block device: %s at %s: %v", loopDevice, targetPath, err))
}
} else if req.GetVolumeCapability().GetMount() != nil {
if vol.AccessType != mountAccess {
return nil, status.Error(codes.InvalidArgument, "cannot publish a non-mount volume as mount volume")
}
notMnt, err := mount.IsNotMountPoint(mount.New(""),targetPath)
if err != nil {
if os.IsNotExist(err) {
if err = os.MkdirAll(targetPath, 0750); err != nil {
return nil, status.Error(codes.Internal, err.Error())
}
notMnt = true
} else {
return nil, status.Error(codes.Internal, err.Error())
}
}
if !notMnt {
return &csi.NodePublishVolumeResponse{}, nil
}
fsType := req.GetVolumeCapability().GetMount().GetFsType()
deviceId := ""
if req.GetPublishContext() != nil {
deviceId = req.GetPublishContext()[deviceID]
}
readOnly := req.GetReadonly()
volumeId := req.GetVolumeId()
attrib := req.GetVolumeContext()
mountFlags := req.GetVolumeCapability().GetMount().GetMountFlags()
glog.V(4).Infof(
"target %v\nfstype %v\ndevice %v\nreadonly %v\nvolumeId %v\nattributes %v\nmountflags %v\n",
targetPath,
fsType,
deviceId,
readOnly,
volumeId,
attrib,
mountFlags,
)
options := []string{"bind"}
if readOnly {
options = append(options, "ro")
}
mounter := mount.New("")
volumePath := fullpath(ns.node.workdir, podUUID, volumeId)
if err := mounter.Mount(volumePath, targetPath, "", options); err != nil {
var errList strings.Builder
errList.WriteString(err.Error())
if vol.Ephemeral {
if rmErr := os.RemoveAll(volumePath); rmErr != nil && !os.IsNotExist(rmErr) {
errList.WriteString(fmt.Sprintf(" :%s", rmErr.Error()))
}
}
return nil, status.Error(codes.Internal, fmt.Sprintf("failed to mount device: %s at %s: %s", volumePath, targetPath, errList.String()))
}
} else {
return nil, status.Error(codes.InvalidArgument, "volume must be of block or mount access type")
}
// Persist newly created ephemeral volume into storage due to the fact that we need the PodUUID information
// when deleting this object.
err = ns.node.storage.Update(func(tx *bolt.Tx) error {
bucket := tx.Bucket([]byte(volumesBucketName))
if bucket == nil {
return fmt.Errorf("bucket %s does not exist", volumesBucketName)
}
marshaledVol, err := json.Marshal(*ephVol)
if err != nil {
return fmt.Errorf("unable to serialize deletion candidate: %w", err)
}
err = bucket.Put([]byte(volID), marshaledVol)
if err != nil {
return fmt.Errorf("unable to insert volume %s into database: %w", volID, err)
}
return nil
})
if err != nil {
glog.Errorf("Unable to persist volume %s: %s", volID, err)
}
return &csi.NodePublishVolumeResponse{}, nil
}
func (ns *nodeServer) NodeUnpublishVolume(ctx context.Context, req *csi.NodeUnpublishVolumeRequest) (*csi.NodeUnpublishVolumeResponse, error) {
// Validate the request that was sent
if len(req.GetVolumeId()) == 0 {
return nil, status.Error(codes.InvalidArgument, "volume ID missing in request")
}
if len(req.GetTargetPath()) == 0 {
return nil, status.Error(codes.InvalidArgument, "target path missing in request")
}
var vol volume
var err error
targetPath := req.GetTargetPath()
volumeID := req.GetVolumeId()
vol, err = ns.node.volumeByID(volumeID)
if err != nil {
glog.V(4).Infof("handling deletion for volume %v even though it was not found in memory", volumeID)
} else if !vol.Ephemeral {
glog.Warningf("handling deletion for volume %v even though it is not ephemeral", vol)
}
// Queue folder that was previously mounted on to pod for deletion. Note that this is different
// than the point where the folder was bind mounted to.
ns.node.deletedVolumes.queue(
volumeID,
deletionCandidate{
Time: time.Now(),
Lifespan: ns.node.afterLifespan,
Path: fullpath(ns.node.workdir, vol.PodUUID, volumeID),
},
)
// Unmount only if the target path is really a mount point.
// This will not delete the underlying data stored in the working directory.
notMnt, err := mount.IsNotMountPoint(mount.New(""), targetPath)
if err != nil {
if !os.IsNotExist(err) {
return nil, status.Error(codes.Internal, err.Error())
}
} else if !notMnt {
// Un-mounting the image or filesystem.
err = mount.New("").Unmount(targetPath)
if err != nil {
return nil, status.Error(codes.Internal, err.Error())
}
}
if vol.AccessType == blockAccess {
return nil, fmt.Errorf("block access is unsupported by this driver")
}
// Delete persisted volume information
err = ns.node.storage.Update(func(tx *bolt.Tx) error {
bucket := tx.Bucket([]byte(volumesBucketName))
err := bucket.Delete([]byte(volumeID))
if err != nil {
return fmt.Errorf("unable to delete %s from permanent storage: %s", volumeID, err)
}
return nil
})
if err != nil {
glog.Error(err)
}
delete(ns.node.volumes, volumeID)
// Since we've already successfully queued the local volume for deletion, we return
// a payload indicating that the delete request was successful. The actual deletion from the local
// disk will take place at a later time.
// TODO(rdelvalle): Since we always tell k8s that we've been successful in removing the disk, we can run into
// an inconsistency issue when we are unable to delete a volume. Explore options to handle this.
return &csi.NodeUnpublishVolumeResponse{}, nil
}
func (ns *nodeServer) NodeStageVolume(ctx context.Context, req *csi.NodeStageVolumeRequest) (*csi.NodeStageVolumeResponse, error) {
if len(req.GetVolumeId()) == 0 {
return nil, status.Error(codes.InvalidArgument, "Volume ID missing in request")
}
if len(req.GetStagingTargetPath()) == 0 {
return nil, status.Error(codes.InvalidArgument, "Target path missing in request")
}
if req.GetVolumeCapability() == nil {
return nil, status.Error(codes.InvalidArgument, "Volume Capability missing in request")
}
return &csi.NodeStageVolumeResponse{}, nil
}
func (ns *nodeServer) NodeUnstageVolume(ctx context.Context, req *csi.NodeUnstageVolumeRequest) (*csi.NodeUnstageVolumeResponse, error) {
if len(req.GetVolumeId()) == 0 {
return nil, status.Error(codes.InvalidArgument, "Volume ID missing in request")
}
if len(req.GetStagingTargetPath()) == 0 {
return nil, status.Error(codes.InvalidArgument, "Target path missing in request")
}
return &csi.NodeUnstageVolumeResponse{}, nil
}
func (ns *nodeServer) NodeGetInfo(ctx context.Context, req *csi.NodeGetInfoRequest) (*csi.NodeGetInfoResponse, error) {
topology := &csi.Topology{
Segments: map[string]string{TopologyKeyNode: ns.node.id},
}
return &csi.NodeGetInfoResponse{
NodeId: ns.node.id,
MaxVolumesPerNode: ns.node.maxVolumes,
AccessibleTopology: topology,
}, nil
}
func (ns *nodeServer) NodeGetCapabilities(ctx context.Context, req *csi.NodeGetCapabilitiesRequest) (*csi.NodeGetCapabilitiesResponse, error) {
return &csi.NodeGetCapabilitiesResponse{
Capabilities: []*csi.NodeServiceCapability{
{
Type: &csi.NodeServiceCapability_Rpc{
Rpc: &csi.NodeServiceCapability_RPC{},
},
},
},
}, nil
}
func (ns *nodeServer) NodeGetVolumeStats(ctx context.Context, in *csi.NodeGetVolumeStatsRequest) (*csi.NodeGetVolumeStatsResponse, error) {
return nil, status.Error(codes.Unimplemented, "")
}
// NodeExpandVolume is only implemented so the driver can be used for e2e testing.
func (ns *nodeServer) NodeExpandVolume(ctx context.Context, req *csi.NodeExpandVolumeRequest) (*csi.NodeExpandVolumeResponse, error) {
return nil, status.Error(codes.Unimplemented, "")
}

125
pkg/katbox/server.go Normal file
View file

@ -0,0 +1,125 @@
/*
Copyright 2019 The Kubernetes Authors.
Modifications copyright 2020 PayPal.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package katbox
import (
"fmt"
"net"
"os"
"strings"
"sync"
"github.com/container-storage-interface/spec/lib/go/csi"
"github.com/golang/glog"
"github.com/kubernetes-csi/csi-lib-utils/protosanitizer"
"golang.org/x/net/context"
"google.golang.org/grpc"
)
func NewNonBlockingGRPCServer() *nonBlockingGRPCServer {
return &nonBlockingGRPCServer{}
}
// NonBlocking server
type nonBlockingGRPCServer struct {
wg sync.WaitGroup
server *grpc.Server
}
func (s *nonBlockingGRPCServer) Start(endpoint string, ids csi.IdentityServer, ns csi.NodeServer) {
s.wg.Add(1)
go s.serve(endpoint, ids, ns)
return
}
func (s *nonBlockingGRPCServer) Wait() {
s.wg.Wait()
}
func (s *nonBlockingGRPCServer) Stop() {
s.server.GracefulStop()
}
func (s *nonBlockingGRPCServer) ForceStop() {
s.server.Stop()
}
func (s *nonBlockingGRPCServer) serve(endpoint string, ids csi.IdentityServer, ns csi.NodeServer) {
proto, addr, err := parseEndpoint(endpoint)
if err != nil {
glog.Fatal(err.Error())
}
if proto == "unix" {
addr = "/" + addr
if err := os.Remove(addr); err != nil && !os.IsNotExist(err) { //nolint: vetshadow
glog.Fatalf("Failed to remove %s, error: %s", addr, err.Error())
}
}
listener, err := net.Listen(proto, addr)
if err != nil {
glog.Fatalf("Failed to listen: %v", err)
}
opts := []grpc.ServerOption{
grpc.UnaryInterceptor(logGRPC),
}
server := grpc.NewServer(opts...)
s.server = server
if ids != nil {
csi.RegisterIdentityServer(server, ids)
}
if ns != nil {
csi.RegisterNodeServer(server, ns)
}
glog.Infof("Listening for connections on address: %#v", listener.Addr())
server.Serve(listener)
}
func parseEndpoint(ep string) (string, string, error) {
if strings.HasPrefix(strings.ToLower(ep), "unix://") || strings.HasPrefix(strings.ToLower(ep), "tcp://") {
s := strings.SplitN(ep, "://", 2)
if s[1] != "" {
return s[0], s[1], nil
}
}
return "", "", fmt.Errorf("invalid endpoint: %v", ep)
}
func logGRPC(ctx context.Context, req interface{}, info *grpc.UnaryServerInfo, handler grpc.UnaryHandler) (interface{}, error) {
glog.V(5).Infof("GRPC call: %s", info.FullMethod)
glog.V(5).Infof("GRPC request: %+v", protosanitizer.StripSecrets(req))
resp, err := handler(ctx, req)
if err != nil {
glog.Errorf("GRPC error: %v", err)
} else {
glog.V(5).Infof("GRPC response: %+v", protosanitizer.StripSecrets(resp))
}
return resp, err
}

47
pkg/katbox/types.go Normal file
View file

@ -0,0 +1,47 @@
/*
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package katbox
const (
deviceID = "deviceID"
maxStorageCapacity = tib
)
// Storage sizes
const (
kib int64 = 1024
mib int64 = kib * 1024
gib int64 = mib * 1024
gib100 int64 = gib * 100
tib int64 = gib * 1024
tib100 int64 = tib * 100
)
type accessType int
const (
mountAccess accessType = iota
blockAccess
)
// Available contexts for volume
const (
podUUIDContext = "csi.storage.k8s.io/pod.uid"
ephemeralContext = "csi.storage.k8s.io/ephemeral"
)
const (
volumesBucketName = "volumes"
deletedVolumesBucketName = "deletedVolumes"
)

73
pkg/katbox/util.go Normal file
View file

@ -0,0 +1,73 @@
/*
Copyright 2020 PayPal.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package katbox
import (
"errors"
"math"
"os"
"path/filepath"
"github.com/golang/glog"
)
// fullpath returns the location where the katbox volume will be created inside the container
// running the katbox plugin
func fullpath(workdir, podUUID, p string) string {
return filepath.Join(workdir, podUUID, p)
}
// pressureFactor returns a value to be used with afterlife calculations.
// The more space we're using in the head room, the more aggressive the early eviction should be.
// Therefore, as the the ratio of headroom space available diminishes, our ratio gets lower and lower.
// Since we will be multiplying by a type time.Duration which is added to the deletion time, we multiply by
// the complement of the percentage of used headroom.
// e.g:
// 40% of our headroom is being used. Therefore we need to reduce the afterlife by 40% (use 60% of afterlife).
// e.g: 600 seconds * .60 = 360 seconds.
// Thus when we check if we should evict, the calculation will be done using 360 seconds after the delete
// happened instead of 600 after the delete happened.
// This concept is inspired by the Apache Mesos disk pressure feature.
func pressureFactor(total, free uint64, headroom float64) (float64, error) {
if headroom < 0.0 || headroom > 1.0 {
return 0, errors.New("headroom must be a value between 0 and 1.0 (inclusive)")
}
headroomSpace := uint64(math.Ceil(float64(total) * headroom))
glog.V(5).Infof("Total Size: %v\nTotal Free: %v\nTotal Headroom Space: %v", total, free, headroomSpace)
if free >= headroomSpace {
return 1.0, nil
}
// Determines how far into the headroom space we currently are and returns the inverse as that's how
// much of the afterlife we should be using.
return 1.0 - float64(headroomSpace-free)/float64(headroomSpace), nil
}
// makeFile ensures that the file exists, creating it if necessary.
// The parent directory must exist.
func makeFile(pathname string) error {
f, err := os.OpenFile(pathname, os.O_CREATE, os.FileMode(0644))
defer f.Close()
if err != nil {
if !os.IsExist(err) {
return err
}
}
return nil
}

51
pkg/katbox/util_test.go Normal file
View file

@ -0,0 +1,51 @@
/*
Copyright 2020 PayPal.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package katbox
import (
"testing"
"github.com/stretchr/testify/assert"
)
func Test_pressureFactor(t *testing.T) {
tests := []struct {
name string
total uint64
free uint64
headroom float64
expectedFactor float64
expectErr bool
}{
{"underUsage", 1000, 110, .10, 1.0, false},
{"overUsage", 1000, 99, .10, .99, false},
{"atUsage", 1000, 100, .10, 1.0, false},
{"negativeHeadroom", 1000, 100, -.10, 0.0, true},
{"greaterThan100", 1000, 100, 10, 0.0, true},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
pf, err := pressureFactor(tt.total, tt.free, tt.headroom)
if tt.expectErr {
assert.Error(t, err, "expected an error")
} else {
assert.EqualValues(t, tt.expectedFactor, pf, "Incorrect pressure factor")
assert.NoError(t, err, "unexpected error")
}
})
}
}