Not a member of Pastebin yet?
Sign Up,
it unlocks many cool features!
- <!DOCTYPE html>
- <html>
- <head>
- <meta http-equiv="Content-Type" content="text/html; charset=utf-8">
- <title>cmd: Go Coverage Report</title>
- <style>
- body {
- background: black;
- color: rgb(80, 80, 80);
- }
- body, pre, #legend span {
- font-family: Menlo, monospace;
- font-weight: bold;
- }
- #topbar {
- background: black;
- position: fixed;
- top: 0; left: 0; right: 0;
- height: 42px;
- border-bottom: 1px solid rgb(80, 80, 80);
- }
- #content {
- margin-top: 50px;
- }
- #nav, #legend {
- float: left;
- margin-left: 10px;
- }
- #legend {
- margin-top: 12px;
- }
- #nav {
- margin-top: 10px;
- }
- #legend span {
- margin: 0 5px;
- }
- .cov0 { color: rgb(192, 0, 0) }
- .cov1 { color: rgb(128, 128, 128) }
- .cov2 { color: rgb(116, 140, 131) }
- .cov3 { color: rgb(104, 152, 134) }
- .cov4 { color: rgb(92, 164, 137) }
- .cov5 { color: rgb(80, 176, 140) }
- .cov6 { color: rgb(68, 188, 143) }
- .cov7 { color: rgb(56, 200, 146) }
- .cov8 { color: rgb(44, 212, 149) }
- .cov9 { color: rgb(32, 224, 152) }
- .cov10 { color: rgb(20, 236, 155) }
- </style>
- </head>
- <body>
- <div id="topbar">
- <div id="nav">
- <select id="files">
- <option value="file0">github.com/openshift/csi-driver-shared-resource/cmd/main.go (11.5%)</option>
- <option value="file1">github.com/openshift/csi-driver-shared-resource/pkg/config/config.go (100.0%)</option>
- <option value="file2">github.com/openshift/csi-driver-shared-resource/pkg/config/manager.go (78.3%)</option>
- <option value="file3">github.com/openshift/csi-driver-shared-resource/pkg/csidriver/dpv.go (79.8%)</option>
- <option value="file4">github.com/openshift/csi-driver-shared-resource/pkg/csidriver/driver.go (69.0%)</option>
- <option value="file5">github.com/openshift/csi-driver-shared-resource/pkg/csidriver/file.go (0.0%)</option>
- <option value="file6">github.com/openshift/csi-driver-shared-resource/pkg/csidriver/identityserver.go (0.0%)</option>
- <option value="file7">github.com/openshift/csi-driver-shared-resource/pkg/csidriver/mount.go (42.9%)</option>
- <option value="file8">github.com/openshift/csi-driver-shared-resource/pkg/csidriver/nodeserver.go (67.9%)</option>
- <option value="file9">github.com/openshift/csi-driver-shared-resource/pkg/csidriver/server.go (0.0%)</option>
- <option value="file10">github.com/openshift/csi-driver-shared-resource/pkg/metrics/metrics.go (100.0%)</option>
- <option value="file11">github.com/openshift/csi-driver-shared-resource/pkg/metrics/server.go (57.9%)</option>
- </select>
- </div>
- <div id="legend">
- <span>not tracked</span>
- <span class="cov0">not covered</span>
- <span class="cov8">covered</span>
- </div>
- </div>
- <div id="content">
- <pre class="file" id="file0" style="display: none">package main
- import (
- "flag"
- "fmt"
- "os"
- "os/signal"
- "syscall"
- "time"
- "github.com/spf13/cobra"
- "github.com/spf13/pflag"
- "k8s.io/client-go/kubernetes"
- "k8s.io/klog/v2"
- "k8s.io/utils/mount"
- sharev1clientset "github.com/openshift/client-go/sharedresource/clientset/versioned"
- "github.com/openshift/csi-driver-shared-resource/pkg/cache"
- "github.com/openshift/csi-driver-shared-resource/pkg/client"
- "github.com/openshift/csi-driver-shared-resource/pkg/config"
- "github.com/openshift/csi-driver-shared-resource/pkg/controller"
- "github.com/openshift/csi-driver-shared-resource/pkg/csidriver"
- operatorv1 "github.com/openshift/api/operator/v1"
- )
- var (
- version string // driver version
- cfgFilePath string // path to configuration file
- endPoint string // CSI driver API endpoint for Kubernetes kubelet
- driverName string // name of the CSI driver, registered in the cluster
- nodeID string // current Kubernetes node identifier
- maxVolumesPerNode int64 // maximum amount of volumes per node, i.e. per driver instance
- shutdownSignals = []os.Signal{os.Interrupt, syscall.SIGTERM}
- onlyOneSignalHandler = make(chan struct{})
- )
- var rootCmd = &cobra.Command{
- Use: "csi-driver-shared-resource",
- Version: "0.0.1",
- Short: "",
- Long: ``,
- Run: func(cmd *cobra.Command, args []string) <span class="cov0" title="0">{
- var err error
- cfgManager := config.NewManager(cfgFilePath)
- cfg, err := cfgManager.LoadConfig()
- if err != nil </span><span class="cov0" title="0">{
- fmt.Printf("Failed to load configuration file '%s': %s", cfgFilePath, err.Error())
- os.Exit(1)
- }</span>
- <span class="cov0" title="0">if !cfg.RefreshResources </span><span class="cov0" title="0">{
- fmt.Println("Refresh-Resources disabled")
- }</span>
- <span class="cov0" title="0">if kubeClient, err := loadKubernetesClientset(); err != nil </span><span class="cov0" title="0">{
- fmt.Printf("Failed to load Kubernetes API client: %s", err.Error())
- os.Exit(1)
- }</span> else<span class="cov0" title="0"> {
- client.SetClient(kubeClient)
- }</span>
- <span class="cov0" title="0">if shareClient, err := loadSharedresourceClientset(); err != nil </span><span class="cov0" title="0">{
- fmt.Printf("Failed to load SharedResource API client: %s", err.Error())
- os.Exit(1)
- }</span> else<span class="cov0" title="0"> {
- client.SetShareClient(shareClient)
- }</span>
- <span class="cov0" title="0">driver, err := csidriver.NewCSIDriver(
- csidriver.DataRoot,
- csidriver.VolumeMapRoot,
- driverName,
- nodeID,
- endPoint,
- maxVolumesPerNode,
- version,
- mount.New(""),
- )
- if err != nil </span><span class="cov0" title="0">{
- fmt.Printf("Failed to initialize driver: %s", err.Error())
- os.Exit(1)
- }</span>
- <span class="cov0" title="0">c, err := controller.NewController(cfg.GetShareRelistInterval(), cfg.RefreshResources)
- if err != nil </span><span class="cov0" title="0">{
- fmt.Printf("Failed to set up controller: %s", err.Error())
- os.Exit(1)
- }</span>
- <span class="cov0" title="0">prunerTicker := time.NewTicker(cfg.GetShareRelistInterval())
- prunerDone := make(chan struct{})
- go func() </span><span class="cov0" title="0">{
- for </span><span class="cov0" title="0">{
- select </span>{
- case <-prunerDone:<span class="cov0" title="0">
- return</span>
- case <-prunerTicker.C:<span class="cov0" title="0">
- // remove any orphaned volume files on disk
- driver.Prune(client.GetClient())
- if cfg.RefreshResources </span><span class="cov0" title="0">{
- // in case we missed delete events, clean up unneeded secret/configmap informers
- c.PruneSecretInformers(cache.NamespacesWithSharedSecrets())
- c.PruneConfigMapInformers(cache.NamespacesWithSharedConfigMaps())
- }</span>
- }
- }
- }()
- <span class="cov0" title="0">go runOperator(c, cfg)
- go watchForConfigChanges(cfgManager)
- driver.Run()
- prunerDone <- struct{}{}</span>
- },
- }
- func main() <span class="cov0" title="0">{
- if err := rootCmd.Execute(); err != nil </span><span class="cov0" title="0">{
- fmt.Println(err)
- os.Exit(1)
- }</span>
- }
- func init() <span class="cov8" title="1">{
- klog.InitFlags(nil)
- pflag.CommandLine.AddGoFlagSet(flag.CommandLine)
- cobra.OnInitialize()
- rootCmd.Flags().AddGoFlagSet(flag.CommandLine)
- rootCmd.Flags().StringVar(&cfgFilePath, "config", "/var/run/configmaps/config/config.yaml", "configuration file path")
- rootCmd.Flags().StringVar(&endPoint, "endpoint", "unix:///csi/csi.sock", "CSI endpoint")
- rootCmd.Flags().StringVar(&driverName, "drivername", string(operatorv1.SharedResourcesCSIDriver), "name of the driver")
- rootCmd.Flags().StringVar(&nodeID, "nodeid", "", "node id")
- rootCmd.Flags().Int64Var(&maxVolumesPerNode, "maxvolumespernode", 0, "limit of volumes per node")
- }</span>
- // loadKubernetesClientset instantiate a clientset using local config.
- func loadKubernetesClientset() (kubernetes.Interface, error) <span class="cov0" title="0">{
- kubeRestConfig, err := client.GetConfig()
- if err != nil </span><span class="cov0" title="0">{
- return nil, err
- }</span>
- <span class="cov0" title="0">return kubernetes.NewForConfig(kubeRestConfig)</span>
- }
- func loadSharedresourceClientset() (sharev1clientset.Interface, error) <span class="cov0" title="0">{
- kubeRestConfig, err := client.GetConfig()
- if err != nil </span><span class="cov0" title="0">{
- return nil, err
- }</span>
- <span class="cov0" title="0">return sharev1clientset.NewForConfig(kubeRestConfig)</span>
- }
- // runOperator based on the informed configuration, it will spawn and run the Controller, until
- // trapping OS signals.
- func runOperator(c *controller.Controller, cfg *config.Config) <span class="cov0" title="0">{
- stopCh := setupSignalHandler()
- err := c.Run(stopCh)
- if err != nil </span><span class="cov0" title="0">{
- fmt.Printf("Controller exited: %s", err.Error())
- os.Exit(1)
- }</span>
- }
- // watchForConfigChanges keeps checking if the informed configuration has changed, and in this case
- // makes the operator exit. The new configuration should take place upon new instance started.
- func watchForConfigChanges(mgr *config.Manager) <span class="cov0" title="0">{
- for </span><span class="cov0" title="0">{
- if mgr.ConfigHasChanged() </span><span class="cov0" title="0">{
- fmt.Println("Configuration has changed on disk, restarting the operator!")
- os.Exit(0)
- }</span>
- <span class="cov0" title="0">time.Sleep(3 * time.Second)</span>
- }
- }
- // setupSignalHandler registered for SIGTERM and SIGINT. A stop channel is returned
- // which is closed on one of these signals. If a second signal is caught, the program
- // is terminated with exit code 1.
- func setupSignalHandler() (stopCh <-chan struct{}) <span class="cov0" title="0">{
- close(onlyOneSignalHandler) // panics when called twice
- stop := make(chan struct{})
- c := make(chan os.Signal, 2)
- signal.Notify(c, shutdownSignals...)
- go func() </span><span class="cov0" title="0">{
- <-c
- close(stop)
- <-c
- os.Exit(1) // second signal. Exit directly.
- }</span>()
- <span class="cov0" title="0">return stop</span>
- }
- </pre>
- <pre class="file" id="file1" style="display: none">package config
- import (
- "time"
- "k8s.io/klog/v2"
- )
- const DefaultResyncDuration = 10 * time.Minute
- // Config configuration attributes.
- type Config struct {
- // ShareRelistInterval interval to relist all "Share" object instances.
- ShareRelistInterval string `yaml:"shareRelistInterval,omitempty"`
- // RefreshResources toggles actively watching for resources, when disabled it will only read
- // resources before mount.
- RefreshResources bool `yaml:"refreshResources,omitempty"`
- }
- var LoadedConfig Config
- // GetShareRelistInterval returns the ShareRelistInterval value as duration. On error, default value
- // is employed instead.
- func (c *Config) GetShareRelistInterval() time.Duration <span class="cov8" title="1">{
- resyncDuration, err := time.ParseDuration(c.ShareRelistInterval)
- if err != nil </span><span class="cov8" title="1">{
- klog.Errorf("Error on parsing ShareRelistInterval '%s': %s", c.ShareRelistInterval, err)
- return DefaultResyncDuration
- }</span>
- <span class="cov8" title="1">return resyncDuration</span>
- }
- // NewConfig returns a Config instance using the default attribute values.
- func NewConfig() Config <span class="cov8" title="1">{
- return Config{
- ShareRelistInterval: DefaultResyncDuration.String(),
- RefreshResources: true,
- }
- }</span>
- </pre>
- <pre class="file" id="file2" style="display: none">package config
- import (
- "crypto/md5"
- "encoding/hex"
- "io/ioutil"
- "os"
- "gopkg.in/yaml.v2"
- "k8s.io/klog/v2"
- )
- // Manager controls the configuration file loading, and can assert if it has changed on disk.
- type Manager struct {
- cfgFilePath string // path to configuration file
- md5sum string // md5sum of the initial content
- }
- // ConfigHasChanged checks the current configuration contents, comparing with that it has been
- // instantiated with.
- func (m *Manager) ConfigHasChanged() bool <span class="cov8" title="1">{
- // given the md5sum is not yet set, the configuration payload won't be marked as changed
- if m.md5sum == "" </span><span class="cov0" title="0">{
- return false
- }</span>
- // reading the configration file payload again and comparing with the md5sum stored, when there
- // are errors reading the file, it does not mark the configuration as changed
- <span class="cov8" title="1">payload, err := ioutil.ReadFile(m.cfgFilePath)
- if err != nil </span><span class="cov0" title="0">{
- klog.Errorf("Reading configuration-file '%s': '%#v'", m.cfgFilePath, err)
- return false
- }</span>
- <span class="cov8" title="1">sum := md5.Sum(payload)
- return m.md5sum != hex.EncodeToString(sum[:])</span>
- }
- // LoadConfig read the local configuration file, make sure the current contents are summed, so we can
- // assert if there are changes later on.
- func (m *Manager) LoadConfig() (*Config, error) <span class="cov8" title="1">{
- cfg := NewConfig()
- if _, err := os.Stat(m.cfgFilePath); os.IsNotExist(err) </span><span class="cov8" title="1">{
- klog.Info("Configuration file is not found, using default values!")
- return &cfg, nil
- }</span>
- // in the case of issues to read the mounted file, and in case of errors marshaling to the
- // destination struct, this method will surface those errors directly, and we may want to create
- // means to differentiate the error scenarios
- <span class="cov8" title="1">klog.Infof("Loading configuration-file '%s'", m.cfgFilePath)
- payload, err := ioutil.ReadFile(m.cfgFilePath)
- if err != nil </span><span class="cov0" title="0">{
- return nil, err
- }</span>
- <span class="cov8" title="1">sum := md5.Sum(payload)
- m.md5sum = hex.EncodeToString(sum[:])
- // overwriting attributes found on the configuration file with the defaults
- if err = yaml.Unmarshal(payload, &cfg); err != nil </span><span class="cov0" title="0">{
- return nil, err
- }</span>
- <span class="cov8" title="1">LoadedConfig = cfg
- return &cfg, nil</span>
- }
- // NewManager instantiate the manager.
- func NewManager(cfgFilePath string) *Manager <span class="cov8" title="1">{
- return &Manager{cfgFilePath: cfgFilePath}
- }</span>
- </pre>
- <pre class="file" id="file3" style="display: none">package csidriver
- import (
- "encoding/json"
- "os"
- "path/filepath"
- "sync"
- "k8s.io/klog/v2"
- "github.com/openshift/csi-driver-shared-resource/pkg/consts"
- )
- // NOTE / TODO: the fields in this struct need to start with a capital letter since we are
- // externalizing / storing to disk, unless there is someway to get the golang encoding
- // logic to use our getters/setters
- type driverVolume struct {
- VolID string `json:"volID"`
- VolName string `json:"volName"`
- VolSize int64 `json:"volSize"`
- VolPathAnchorDir string `json:"volPathAnchorDir"`
- VolPathBindMountDir string `json:"volPathBindMountDir"`
- VolAccessType accessType `json:"volAccessType"`
- TargetPath string `json:"targetPath"`
- SharedDataKind string `json:"sharedDataKind"`
- SharedDataId string `json:"sharedDataId"`
- PodNamespace string `json:"podNamespace"`
- PodName string `json:"podName"`
- PodUID string `json:"podUID"`
- PodSA string `json:"podSA"`
- Refresh bool `json:"refresh"`
- // dpv's can be accessed/modified by both the sharedSecret/SharedConfigMap events and the configmap/secret events; to prevent data races
- // we serialize access to a given dpv with a per dpv mutex stored in this map; access to dpv fields should not
- // be done directly, but only by each field's getter and setter. Getters and setters then leverage the per dpv
- // Lock objects stored in this map to prevent golang data races
- Lock *sync.Mutex `json:"-"` // we do not want this persisted to and from disk, so use of `json:"-"`
- }
- func CreateDV(volID string) *driverVolume <span class="cov8" title="1">{
- dpv := &driverVolume{VolID: volID, Lock: &sync.Mutex{}}
- setDPV(volID, dpv)
- return dpv
- }</span>
- func (dpv *driverVolume) GetVolID() string <span class="cov8" title="1">{
- dpv.Lock.Lock()
- defer dpv.Lock.Unlock()
- return dpv.VolID
- }</span>
- func (dpv *driverVolume) GetVolName() string <span class="cov0" title="0">{
- dpv.Lock.Lock()
- defer dpv.Lock.Unlock()
- return dpv.VolName
- }</span>
- func (dpv *driverVolume) GetVolSize() int64 <span class="cov0" title="0">{
- dpv.Lock.Lock()
- defer dpv.Lock.Unlock()
- return dpv.VolSize
- }</span>
- func (dpv *driverVolume) GetVolPathAnchorDir() string <span class="cov8" title="1">{
- dpv.Lock.Lock()
- defer dpv.Lock.Unlock()
- return dpv.VolPathAnchorDir
- }</span>
- func (dpv *driverVolume) GetVolPathBindMountDir() string <span class="cov0" title="0">{
- dpv.Lock.Lock()
- defer dpv.Lock.Unlock()
- return dpv.VolPathBindMountDir
- }</span>
- func (dpv *driverVolume) GetVolAccessType() accessType <span class="cov0" title="0">{
- dpv.Lock.Lock()
- defer dpv.Lock.Unlock()
- return dpv.VolAccessType
- }</span>
- func (dpv *driverVolume) GetTargetPath() string <span class="cov8" title="1">{
- dpv.Lock.Lock()
- defer dpv.Lock.Unlock()
- return dpv.TargetPath
- }</span>
- func (dpv *driverVolume) GetSharedDataKind() consts.ResourceReferenceType <span class="cov8" title="1">{
- dpv.Lock.Lock()
- defer dpv.Lock.Unlock()
- return consts.ResourceReferenceType(dpv.SharedDataKind)
- }</span>
- func (dpv *driverVolume) GetSharedDataId() string <span class="cov8" title="1">{
- dpv.Lock.Lock()
- defer dpv.Lock.Unlock()
- return dpv.SharedDataId
- }</span>
- func (dpv *driverVolume) GetPodNamespace() string <span class="cov8" title="1">{
- dpv.Lock.Lock()
- defer dpv.Lock.Unlock()
- return dpv.PodNamespace
- }</span>
- func (dpv *driverVolume) GetPodName() string <span class="cov8" title="1">{
- dpv.Lock.Lock()
- defer dpv.Lock.Unlock()
- return dpv.PodName
- }</span>
- func (dpv *driverVolume) GetPodUID() string <span class="cov0" title="0">{
- dpv.Lock.Lock()
- defer dpv.Lock.Unlock()
- return dpv.PodUID
- }</span>
- func (dpv *driverVolume) GetPodSA() string <span class="cov8" title="1">{
- dpv.Lock.Lock()
- defer dpv.Lock.Unlock()
- return dpv.PodSA
- }</span>
- func (dpv *driverVolume) IsRefresh() bool <span class="cov8" title="1">{
- dpv.Lock.Lock()
- defer dpv.Lock.Unlock()
- return dpv.Refresh
- }</span>
- func (dpv *driverVolume) SetVolName(volName string) <span class="cov0" title="0">{
- dpv.Lock.Lock()
- defer dpv.Lock.Unlock()
- dpv.VolName = volName
- }</span>
- func (dpv *driverVolume) SetVolSize(size int64) <span class="cov8" title="1">{
- dpv.Lock.Lock()
- defer dpv.Lock.Unlock()
- dpv.VolSize = size
- }</span>
- func (dpv *driverVolume) SetVolPathAnchorDir(path string) <span class="cov8" title="1">{
- dpv.Lock.Lock()
- defer dpv.Lock.Unlock()
- dpv.VolPathAnchorDir = path
- }</span>
- func (dpv *driverVolume) SetVolPathBindMountDir(path string) <span class="cov8" title="1">{
- dpv.Lock.Lock()
- defer dpv.Lock.Unlock()
- dpv.VolPathBindMountDir = path
- }</span>
- func (dpv *driverVolume) SetVolAccessType(at accessType) <span class="cov8" title="1">{
- dpv.Lock.Lock()
- defer dpv.Lock.Unlock()
- dpv.VolAccessType = at
- }</span>
- func (dpv *driverVolume) SetTargetPath(path string) <span class="cov8" title="1">{
- dpv.Lock.Lock()
- defer dpv.Lock.Unlock()
- dpv.TargetPath = path
- }</span>
- func (dpv *driverVolume) SetSharedDataKind(kind string) <span class="cov8" title="1">{
- dpv.Lock.Lock()
- defer dpv.Lock.Unlock()
- dpv.SharedDataKind = kind
- }</span>
- func (dpv *driverVolume) SetSharedDataId(id string) <span class="cov8" title="1">{
- dpv.Lock.Lock()
- defer dpv.Lock.Unlock()
- dpv.SharedDataId = id
- }</span>
- func (dpv *driverVolume) SetPodNamespace(namespace string) <span class="cov8" title="1">{
- dpv.Lock.Lock()
- defer dpv.Lock.Unlock()
- dpv.PodNamespace = namespace
- }</span>
- func (dpv *driverVolume) SetPodName(name string) <span class="cov8" title="1">{
- dpv.Lock.Lock()
- defer dpv.Lock.Unlock()
- dpv.PodName = name
- }</span>
- func (dpv *driverVolume) SetPodUID(uid string) <span class="cov8" title="1">{
- dpv.Lock.Lock()
- defer dpv.Lock.Unlock()
- dpv.PodUID = uid
- }</span>
- func (dpv *driverVolume) SetPodSA(sa string) <span class="cov8" title="1">{
- dpv.Lock.Lock()
- defer dpv.Lock.Unlock()
- dpv.PodSA = sa
- }</span>
- func (dpv *driverVolume) SetRefresh(refresh bool) <span class="cov8" title="1">{
- dpv.Lock.Lock()
- defer dpv.Lock.Unlock()
- dpv.Refresh = refresh
- }</span>
- func (dpv *driverVolume) StoreToDisk(volMapRoot string) error <span class="cov8" title="1">{
- dpv.Lock.Lock()
- defer dpv.Lock.Unlock()
- klog.V(4).Infof("storeVolToDisk %s", dpv.VolID)
- defer klog.V(4).Infof("storeVolToDisk exit %s", dpv.VolID)
- f, terr := os.Open(volMapRoot)
- if terr != nil </span><span class="cov0" title="0">{
- // catch for unit tests
- return nil
- }</span>
- <span class="cov8" title="1">defer f.Close()
- filePath := filepath.Join(volMapRoot, dpv.VolID)
- dataFile, err := os.Create(filePath)
- if err != nil </span><span class="cov0" title="0">{
- return err
- }</span>
- <span class="cov8" title="1">defer dataFile.Close()
- dataEncoder := json.NewEncoder(dataFile)
- return dataEncoder.Encode(dpv)</span>
- }
- </pre>
- <pre class="file" id="file4" style="display: none">/*
- Copyright 2017 The Kubernetes Authors.
- Licensed under the Apache License, Version 2.0 (the "License");
- you may not use this file except in compliance with the License.
- You may obtain a copy of the License at
- http://www.apache.org/licenses/LICENSE-2.0
- Unless required by applicable law or agreed to in writing, software
- distributed under the License is distributed on an "AS IS" BASIS,
- WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- See the License for the specific language governing permissions and
- limitations under the License.
- */
- package csidriver
- import (
- "context"
- "encoding/json"
- "errors"
- "fmt"
- "io"
- "io/ioutil"
- "k8s.io/utils/mount"
- "os"
- "path/filepath"
- "strings"
- "sync"
- corev1 "k8s.io/api/core/v1"
- kerrors "k8s.io/apimachinery/pkg/api/errors"
- metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
- "k8s.io/client-go/kubernetes"
- "k8s.io/klog/v2"
- sharev1alpha1 "github.com/openshift/api/sharedresource/v1alpha1"
- objcache "github.com/openshift/csi-driver-shared-resource/pkg/cache"
- "github.com/openshift/csi-driver-shared-resource/pkg/client"
- "github.com/openshift/csi-driver-shared-resource/pkg/config"
- "github.com/openshift/csi-driver-shared-resource/pkg/consts"
- )
- type driver struct {
- name string
- nodeID string
- version string
- endpoint string
- ephemeral bool
- maxVolumesPerNode int64
- ids *identityServer
- ns *nodeServer
- root string
- volMapRoot string
- mounter mount.Interface
- }
- var (
- vendorVersion = "dev"
- volumes = sync.Map{}
- )
- const (
- // Directory where data for volumes are persisted.
- // This is ephemeral to facilitate our per-pod, tmpfs,
- // no bind mount, approach.
- DataRoot = "/run/csi-data-dir"
- // Directory where we persist `volumes`
- // This is a csidriver volume on the local node
- // to maintain state across restarts of the DaemonSet
- VolumeMapRoot = "/csi-volumes-map"
- )
- func (d *driver) getVolume(name string) *driverVolume <span class="cov8" title="1">{
- obj, loaded := volumes.Load(name)
- if loaded </span><span class="cov8" title="1">{
- dv, _ := obj.(*driverVolume)
- return dv
- }</span>
- <span class="cov8" title="1">return nil</span>
- }
- func setDPV(name string, dpv *driverVolume) <span class="cov8" title="1">{
- if dpv.Lock == nil </span><span class="cov0" title="0">{
- dpv.Lock = &sync.Mutex{}
- }</span>
- <span class="cov8" title="1">volumes.Store(name, dpv)</span>
- }
- func remV(name string) <span class="cov8" title="1">{
- volumes.Delete(name)
- }</span>
- type CSIDriver interface {
- createVolume(volID, targetPath string, refresh bool, volCtx map[string]string, cmShare *sharev1alpha1.SharedConfigMap, sShare *sharev1alpha1.SharedSecret, cap int64, volAccessType accessType) (*driverVolume, error)
- getVolume(volID string) *driverVolume
- deleteVolume(volID string) error
- getVolumePath(volID string, volCtx map[string]string) (string, string)
- mapVolumeToPod(dv *driverVolume) error
- Run()
- GetRoot() string
- GetVolMapRoot() string
- Prune(kubeClient kubernetes.Interface)
- }
- // NewCSIDriver instantiate the CSIDriver with the driver details. Optionally, a
- // Kubernetes Clientset can be informed to update (warm up) the object cache before creating the
- // volume (and it's data) for mounting on the incoming pod.
- func NewCSIDriver(root, volMapRoot, driverName, nodeID, endpoint string, maxVolumesPerNode int64, version string, mounter mount.Interface) (CSIDriver, error) <span class="cov8" title="1">{
- if driverName == "" </span><span class="cov0" title="0">{
- return nil, errors.New("no driver name provided")
- }</span>
- <span class="cov8" title="1">if nodeID == "" </span><span class="cov0" title="0">{
- return nil, errors.New("no node id provided")
- }</span>
- <span class="cov8" title="1">if endpoint == "" </span><span class="cov0" title="0">{
- return nil, errors.New("no driver endpoint provided")
- }</span>
- <span class="cov8" title="1">if version != "" </span><span class="cov8" title="1">{
- vendorVersion = version
- }</span>
- <span class="cov8" title="1">if err := os.MkdirAll(root, 0750); err != nil </span><span class="cov0" title="0">{
- return nil, fmt.Errorf("failed to create DataRoot: %v", err)
- }</span>
- <span class="cov8" title="1">if err := os.MkdirAll(volMapRoot, 0750); err != nil </span><span class="cov0" title="0">{
- return nil, fmt.Errorf("failed to create VolMapRoot: %v", err)
- }</span>
- <span class="cov8" title="1">klog.Infof("Driver: '%v', Version: '%s'", driverName, vendorVersion)
- klog.Infof("EndPoint: '%s', NodeID: '%s'", endpoint, nodeID)
- if !config.LoadedConfig.RefreshResources </span><span class="cov8" title="1">{
- klog.Info("RefreshResources is disabled and CSIDriver will directly read Kubernetes corev1 resources!")
- }</span>
- <span class="cov8" title="1">d := &driver{
- name: driverName,
- version: vendorVersion,
- nodeID: nodeID,
- endpoint: endpoint,
- maxVolumesPerNode: maxVolumesPerNode,
- root: root,
- volMapRoot: volMapRoot,
- mounter: mounter,
- }
- if err := d.loadVolsFromDisk(); err != nil </span><span class="cov0" title="0">{
- return nil, fmt.Errorf("failed to load volume map on disk: %v", err)
- }</span>
- <span class="cov8" title="1">return d, nil</span>
- }
- func (d *driver) GetRoot() string <span class="cov0" title="0">{
- return d.root
- }</span>
- func (d *driver) GetVolMapRoot() string <span class="cov8" title="1">{
- return d.volMapRoot
- }</span>
- func (d *driver) Run() <span class="cov0" title="0">{
- // Create GRPC servers
- d.ids = NewIdentityServer(d.name, d.version)
- // the node-server will be on always-read-only mode when the object-cache is being populated
- // directly
- d.ns = NewNodeServer(d)
- s := NewNonBlockingGRPCServer()
- s.Start(d.endpoint, d.ids, d.ns)
- s.Wait()
- }</span>
- // getVolumePath returns the canonical paths for csidriver volume
- func (d *driver) getVolumePath(volID string, volCtx map[string]string) (string, string) <span class="cov8" title="1">{
- podNamespace, podName, podUID, podSA := getPodDetails(volCtx)
- mountIDString := strings.Join([]string{podNamespace, podName, volID}, "-")
- return mountIDString, filepath.Join(d.root, bindDir, volID, podNamespace, podName, podUID, podSA)
- }</span>
- func commonRangerProceedFilter(dv *driverVolume, key interface{}) bool <span class="cov8" title="1">{
- if dv == nil </span><span class="cov0" title="0">{
- return false
- }</span>
- <span class="cov8" title="1">compareKey := ""
- // see if the shared item pertains to this volume
- switch dv.GetSharedDataKind() </span>{
- case consts.ResourceReferenceTypeSecret:<span class="cov8" title="1">
- sharedSecret := client.GetSharedSecret(dv.GetSharedDataId())
- if sharedSecret == nil </span><span class="cov0" title="0">{
- klog.V(6).Infof("commonRangerProceedFilter could not retrieve share %s for %s:%s:%s", dv.GetSharedDataId(), dv.GetPodNamespace(), dv.GetPodName(), dv.GetVolID())
- return false
- }</span>
- <span class="cov8" title="1">compareKey = objcache.BuildKey(sharedSecret.Spec.SecretRef.Namespace, sharedSecret.Spec.SecretRef.Name)</span>
- case consts.ResourceReferenceTypeConfigMap:<span class="cov8" title="1">
- sharedConfigMap := client.GetSharedConfigMap(dv.GetSharedDataId())
- if sharedConfigMap == nil </span><span class="cov0" title="0">{
- klog.V(6).Infof("commonRangerProceedFilter could not retrieve share %s for %s:%s:%s", dv.GetSharedDataId(), dv.GetPodNamespace(), dv.GetPodName(), dv.GetVolID())
- return false
- }</span>
- <span class="cov8" title="1">compareKey = objcache.BuildKey(sharedConfigMap.Spec.ConfigMapRef.Namespace, sharedConfigMap.Spec.ConfigMapRef.Name)</span>
- default:<span class="cov0" title="0">
- klog.Warningf("commonRangerProceedFilter unknown share type for %s:%s:%s: %s", dv.GetPodNamespace(), dv.GetPodName(), dv.GetVolID())
- return false</span>
- }
- <span class="cov8" title="1">keyStr := key.(string)
- if keyStr != compareKey </span><span class="cov8" title="1">{
- klog.V(4).Infof("commonRangerProceedFilter skipping %s as it does not match %s for %s:%s:%s", keyStr, compareKey, dv.GetPodNamespace(), dv.GetPodName(), dv.GetVolID())
- return false
- }</span>
- <span class="cov8" title="1">return true</span>
- }
- func commonUpsertRanger(dv *driverVolume, key, value interface{}) error <span class="cov8" title="1">{
- proceed := commonRangerProceedFilter(dv, key)
- if !proceed </span><span class="cov8" title="1">{
- return nil
- }</span>
- <span class="cov8" title="1">payload, _ := value.(Payload)
- klog.V(4).Infof("commonUpsertRanger key %s dv %#v", key, dv)
- podPath := dv.GetTargetPath()
- // So, what to do with error handling. Errors with filesystem operations
- // will almost always not be intermittent, but most likely the result of the
- // host filesystem either being full or compromised in some long running fashion, so tight-loop retry, like we
- // *could* do here as a result will typically prove fruitless.
- // Then, the controller relist will result in going through the secrets/configmaps we share, so
- // again, on the off chance the filesystem error is intermittent, or if an administrator has taken corrective
- // action, writing the content will be retried. And note, the relist interval is configurable (default 10 minutes)
- // if users want more rapid retry...but by default, no tight loop more CPU intensive retry
- // Lastly, with the understanding that an error log in the pod stdout may be missed, we will also generate a k8s
- // event to facilitate exposure
- // TODO: prometheus metrics/alerts may be desired here, though some due diligence on what k8s level metrics/alerts
- // around host filesystem issues might already exist would be warranted with such an exploration/effort
- // Next, on an update we first nuke any existing directory and then recreate it to simplify handling the case where
- // the keys in the secret/configmap have changed such that some keys have been removed, which would translate
- // in files having to be removed. commonOSRemove will handle shares mounted off of shares. And a reminder,
- // currently this driver does not support overlaying over directories with files. Either the directory in the
- // container image must be empty, or the directory does not exist, and is created for the Pod's container as
- // part of provisioning the container.
- if err := commonOSRemove(podPath, fmt.Sprintf("commonUpsertRanger key %s volid %s share id %s pod name %s", key, dv.GetVolID(), dv.GetSharedDataId(), dv.GetPodName())); err != nil </span><span class="cov0" title="0">{
- return err
- }</span>
- <span class="cov8" title="1">if err := os.MkdirAll(podPath, os.ModePerm); err != nil </span><span class="cov0" title="0">{
- return err
- }</span>
- <span class="cov8" title="1">if payload.ByteData != nil </span><span class="cov8" title="1">{
- for dataKey, dataValue := range payload.ByteData </span><span class="cov8" title="1">{
- podFilePath := filepath.Join(podPath, dataKey)
- klog.V(4).Infof("commonUpsertRanger create/update file %s key %s volid %s share id %s pod name %s", podFilePath, key, dv.GetVolID(), dv.GetSharedDataId(), dv.GetPodName())
- if err := ioutil.WriteFile(podFilePath, dataValue, 0644); err != nil </span><span class="cov0" title="0">{
- return err
- }</span>
- }
- }
- <span class="cov8" title="1">if payload.StringData != nil </span><span class="cov8" title="1">{
- for dataKey, dataValue := range payload.StringData </span><span class="cov8" title="1">{
- podFilePath := filepath.Join(podPath, dataKey)
- klog.V(4).Infof("commonUpsertRanger create/update file %s key %s volid %s share id %s pod name %s", podFilePath, key, dv.GetVolID(), dv.GetSharedDataId(), dv.GetPodName())
- content := []byte(dataValue)
- if err := ioutil.WriteFile(podFilePath, content, 0644); err != nil </span><span class="cov0" title="0">{
- return err
- }</span>
- }
- }
- <span class="cov8" title="1">klog.V(4).Infof("common upsert ranger returning key %s", key)
- return nil</span>
- }
- func commonOSRemove(dir, dbg string) error <span class="cov8" title="1">{
- klog.V(4).Infof("commonOSRemove to delete %q dbg %s", dir, dbg)
- defer klog.V(4).Infof("commonOSRemove completed delete attempt for dir %q", dir)
- // we cannot do a os.RemoveAll on the mount point, so we remove all on each file system entity
- // off of the potential mount point
- return filepath.Walk(dir, func(path string, info os.FileInfo, err error) error </span><span class="cov8" title="1">{
- if info == nil </span><span class="cov0" title="0">{
- return nil
- }</span>
- // since we do not support mounting on existing content, a dir can only mean a share
- // has been mounted as a separate dir in our share, so skip
- <span class="cov8" title="1">if info.IsDir() </span><span class="cov8" title="1">{
- return nil
- }</span>
- <span class="cov8" title="1">fileName := filepath.Join(dir, info.Name())
- klog.V(4).Infof("commonOSRemove going to delete file %s", fileName)
- return os.RemoveAll(fileName)</span>
- })
- }
- func commonDeleteRanger(dv *driverVolume, key interface{}) bool <span class="cov8" title="1">{
- proceed := commonRangerProceedFilter(dv, key)
- if !proceed </span><span class="cov0" title="0">{
- // even though we are aborting, return true to continue to next entry in ranger list
- return true
- }</span>
- <span class="cov8" title="1">klog.V(4).Infof("common delete ranger key %s", key)
- commonOSRemove(dv.GetTargetPath(), fmt.Sprintf("commonDeleteRanger %s", key))
- klog.V(4).Infof("common delete ranger returning key %s", key)
- return true</span>
- }
- type innerShareDeleteRanger struct {
- shareId string
- }
- func (r *innerShareDeleteRanger) Range(key, value interface{}) bool <span class="cov8" title="1">{
- targetPath := ""
- volID := key.(string)
- // painful debug has shown you cannot trust the value that comes in, you have to refetch,
- // unless the map only has 1 entry in it
- var dv *driverVolume
- klog.V(4).Infof("innerShareDeleteRanger key %q\n incoming share id %s",
- key,
- r.shareId)
- dvObj, ok := volumes.Load(key)
- if !ok </span><span class="cov0" title="0">{
- klog.V(0).Infof("innerShareDeleteRanger how the hell can we not load key %s from the range list", key)
- // continue to the next entry, skip this one
- return true
- }</span> else<span class="cov8" title="1"> {
- dv, _ = dvObj.(*driverVolume)
- }</span>
- <span class="cov8" title="1">if dv.GetVolID() == volID && dv.GetSharedDataId() == r.shareId </span><span class="cov8" title="1">{
- klog.V(4).Infof("innerShareDeleteRanger shareid %s kind %s", r.shareId, dv.GetSharedDataKind())
- targetPath = dv.GetTargetPath()
- volID = dv.GetVolID()
- if len(volID) > 0 && len(targetPath) > 0 </span><span class="cov8" title="1">{
- err := commonOSRemove(targetPath, fmt.Sprintf("innerShareDeleteRanger shareID id %s", r.shareId))
- if err != nil </span><span class="cov0" title="0">{
- klog.Warningf("innerShareDeleteRanger %s vol %s target path %s delete error %s",
- r.shareId, volID, targetPath, err.Error())
- }</span>
- // we just delete the associated data from the previously provisioned volume;
- // we don't delete the volume in case the share is added back
- }
- <span class="cov8" title="1">return false</span>
- }
- <span class="cov0" title="0">return true</span>
- }
- func shareDeleteRanger(key interface{}) bool <span class="cov8" title="1">{
- shareId := key.(string)
- klog.V(4).Infof("shareDeleteRanger shareID id %s", shareId)
- ranger := &innerShareDeleteRanger{
- shareId: shareId,
- }
- volumes.Range(ranger.Range)
- klog.V(4).Infof("shareDeleteRanger returning share id %s", shareId)
- return true
- }</span>
- type innerShareUpdateRanger struct {
- shareId string
- secret bool
- configmap bool
- oldTargetPath string
- sharedItemKey string
- volID string
- sharedItem Payload
- }
- func (r *innerShareUpdateRanger) Range(key, value interface{}) bool <span class="cov8" title="1">{
- volID := key.(string)
- // painful debug has shown you cannot trust the value that comes in, you have to refetch,
- // unless the map only has 1 entry in it
- var dv *driverVolume
- klog.V(4).Infof("innerShareUpdateRanger key %q\n incoming share id %s",
- key,
- r.shareId)
- dvObj, ok := volumes.Load(key)
- if !ok </span><span class="cov0" title="0">{
- klog.V(0).Infof("innerShareUpdateRanger how the hell can we not load key %s from the range list", key)
- // continue to the next entry, skip this one
- return true
- }</span> else<span class="cov8" title="1"> {
- dv, _ = dvObj.(*driverVolume)
- }</span>
- <span class="cov8" title="1">if dv.GetVolID() == volID && dv.GetSharedDataId() == r.shareId </span><span class="cov8" title="1">{
- klog.V(4).Infof("innerShareUpdateRanger MATCH inner ranger key %q\n dv vol id %s\n incoming share id %s\n dv share id %s", key, dv.GetVolID(), r.shareId, dv.GetSharedDataId())
- a, err := client.ExecuteSAR(r.shareId, dv.GetPodNamespace(), dv.GetPodName(), dv.GetPodSA(), dv.GetSharedDataKind())
- allowed := a && err == nil
- if allowed </span><span class="cov8" title="1">{
- klog.V(0).Infof("innerShareUpdateRanger pod %s:%s has permissions for secretShare %s",
- dv.GetPodNamespace(), dv.GetPodName(), r.shareId)
- }</span> else<span class="cov8" title="1"> {
- klog.V(0).Infof("innerShareUpdateRanger pod %s:%s does not permission for secretShare %s",
- dv.GetPodNamespace(), dv.GetPodName(), r.shareId)
- }</span>
- <span class="cov8" title="1">switch </span>{
- case r.secret:<span class="cov8" title="1">
- sharedSecret := client.GetSharedSecret(r.shareId)
- if sharedSecret == nil </span><span class="cov0" title="0">{
- klog.Warningf("innerShareUpdateRanger unexpected not found on sharedSecret lister refresh: %s", r.shareId)
- return false
- }</span>
- <span class="cov8" title="1">r.sharedItemKey = objcache.BuildKey(sharedSecret.Spec.SecretRef.Namespace, sharedSecret.Spec.SecretRef.Name)
- secretObj := client.GetSecret(sharedSecret.Spec.SecretRef.Namespace, sharedSecret.Spec.SecretRef.Name)
- if secretObj == nil </span><span class="cov0" title="0">{
- klog.Infof("innerShareUpdateRanger share %s could not retrieve shared item %s", r.shareId, r.sharedItemKey)
- return false
- }</span>
- <span class="cov8" title="1">r.sharedItem = Payload{
- ByteData: secretObj.Data,
- StringData: secretObj.StringData,
- }</span>
- case r.configmap:<span class="cov8" title="1">
- sharedConfigMap := client.GetSharedConfigMap(r.shareId)
- if sharedConfigMap == nil </span><span class="cov0" title="0">{
- klog.Warningf("innerShareUpdateRanger unexpected not found on sharedConfigMap lister refresh: %s", r.shareId)
- return false
- }</span>
- <span class="cov8" title="1">r.sharedItemKey = objcache.BuildKey(sharedConfigMap.Spec.ConfigMapRef.Namespace, sharedConfigMap.Spec.ConfigMapRef.Name)
- cmObj := client.GetConfigMap(sharedConfigMap.Spec.ConfigMapRef.Namespace, sharedConfigMap.Spec.ConfigMapRef.Name)
- if cmObj == nil </span><span class="cov8" title="1">{
- klog.Infof("innerShareUpdateRanger share %s could not retrieve shared item %s", r.shareId, r.sharedItemKey)
- return false
- }</span>
- <span class="cov8" title="1">r.sharedItem = Payload{
- StringData: cmObj.Data,
- ByteData: cmObj.BinaryData,
- }</span>
- }
- <span class="cov8" title="1">r.oldTargetPath = dv.GetTargetPath()
- r.volID = dv.GetVolID()
- if !allowed </span><span class="cov8" title="1">{
- err := commonOSRemove(r.oldTargetPath, "lostPermissions")
- if err != nil </span><span class="cov0" title="0">{
- klog.Warningf("innerShareUpdateRanger %s target path %s delete error %s",
- key, r.oldTargetPath, err.Error())
- }</span>
- <span class="cov8" title="1">objcache.UnregisterSecretUpsertCallback(r.volID)
- objcache.UnregisterSecretDeleteCallback(r.volID)
- objcache.UnregisterConfigMapDeleteCallback(r.volID)
- objcache.UnregisterConfigMapUpsertCallback(r.volID)
- return false</span>
- }
- <span class="cov8" title="1">commonUpsertRanger(dv, r.sharedItemKey, r.sharedItem)</span>
- }
- <span class="cov8" title="1">klog.V(4).Infof("innerShareUpdateRanger NO MATCH inner ranger key %q\n dv vol id %s\n incoming share id %s\n dv share id %s", key, dv.GetVolID(), r.shareId, dv.GetSharedDataId())
- return true</span>
- }
- func shareUpdateRanger(key, value interface{}) bool <span class="cov8" title="1">{
- shareId := key.(string)
- _, sok := value.(*sharev1alpha1.SharedSecret)
- _, cmok := value.(*sharev1alpha1.SharedConfigMap)
- if !sok && !cmok </span><span class="cov0" title="0">{
- klog.Warningf("unknown shareUpdateRanger key %q object %#v", key, value)
- return false
- }</span>
- <span class="cov8" title="1">klog.V(4).Infof("shareUpdateRanger key %s secret %v configmap %v", key, sok, cmok)
- rangerObj := &innerShareUpdateRanger{
- shareId: shareId,
- secret: sok,
- configmap: cmok,
- }
- volumes.Range(rangerObj.Range)
- klog.V(4).Infof("shareUpdateRanger key %s value %#v inner ranger %#v inner ranger", key, value, rangerObj)
- return true</span>
- }
- func mapBackingResourceToPod(dv *driverVolume) error <span class="cov8" title="1">{
- klog.V(4).Infof("mapBackingResourceToPod")
- switch dv.GetSharedDataKind() </span>{
- case consts.ResourceReferenceTypeConfigMap:<span class="cov8" title="1">
- klog.V(4).Infof("mapBackingResourceToPod postlock %s configmap", dv.GetVolID())
- upsertRangerCM := func(key, value interface{}) bool </span><span class="cov0" title="0">{
- cm, _ := value.(*corev1.ConfigMap)
- payload := Payload{
- StringData: cm.Data,
- ByteData: cm.BinaryData,
- }
- err := commonUpsertRanger(dv, key, payload)
- if err != nil </span><span class="cov0" title="0">{
- ProcessFileSystemError(cm, err)
- }</span>
- // we always return true in the golang ranger to still attempt additional items
- // on the off chance the filesystem error received was intermittent and other items
- // will succeed ... remember, the ranger predominantly deals with pushing secret/configmap
- // updates to disk
- <span class="cov0" title="0">return true</span>
- }
- // we call the upsert ranger inline in case there are filesystem problems initially, so
- // we can return the error back to volume provisioning, where the kubelet will retry at
- // a controlled frequency
- <span class="cov8" title="1">sharedConfigMap := client.GetSharedConfigMap(dv.GetSharedDataId())
- if sharedConfigMap == nil </span><span class="cov0" title="0">{
- klog.V(4).Infof("mapBackingResourceToPod for pod volume %s:%s:%s share %s no longer exists", dv.GetPodNamespace(), dv.GetPodName(), dv.GetVolID(), dv.GetSharedDataId())
- return nil
- }</span>
- <span class="cov8" title="1">cmNamespace := sharedConfigMap.Spec.ConfigMapRef.Namespace
- cmName := sharedConfigMap.Spec.ConfigMapRef.Name
- comboKey := objcache.BuildKey(cmNamespace, cmName)
- cm := client.GetConfigMap(cmNamespace, cmName)
- if cm != nil </span><span class="cov8" title="1">{
- payload := Payload{
- StringData: cm.Data,
- ByteData: cm.BinaryData,
- }
- upsertError := commonUpsertRanger(dv, comboKey, payload)
- if upsertError != nil </span><span class="cov0" title="0">{
- ProcessFileSystemError(cm, upsertError)
- return upsertError
- }</span>
- }
- <span class="cov8" title="1">if dv.IsRefresh() </span><span class="cov8" title="1">{
- objcache.RegisterConfigMapUpsertCallback(dv.GetVolID(), comboKey, upsertRangerCM)
- }</span>
- <span class="cov8" title="1">deleteRangerCM := func(key, value interface{}) bool </span><span class="cov8" title="1">{
- return commonDeleteRanger(dv, key)
- }</span>
- //we should register delete callbacks regardless of any per volume refresh setting to account for removed permissions
- <span class="cov8" title="1">objcache.RegisterConfigMapDeleteCallback(dv.GetVolID(), deleteRangerCM)</span>
- case consts.ResourceReferenceTypeSecret:<span class="cov8" title="1">
- klog.V(4).Infof("mapBackingResourceToPod postlock %s secret", dv.GetVolID())
- upsertRangerSec := func(key, value interface{}) bool </span><span class="cov0" title="0">{
- s, _ := value.(*corev1.Secret)
- payload := Payload{
- ByteData: s.Data,
- }
- err := commonUpsertRanger(dv, key, payload)
- if err != nil </span><span class="cov0" title="0">{
- ProcessFileSystemError(s, err)
- }</span>
- // we always return true in the golang ranger to still attempt additional items
- // on the off chance the filesystem error received was intermittent and other items
- // will succeed ... remember, the ranger predominantly deals with pushing secret/configmap
- // updates to disk
- <span class="cov0" title="0">return true</span>
- }
- // we call the upsert ranger inline in case there are filesystem problems initially, so
- // we can return the error back to volume provisioning, where the kubelet will retry at
- // a controlled frequency
- <span class="cov8" title="1">sharedSecret := client.GetSharedSecret(dv.GetSharedDataId())
- sNamespace := sharedSecret.Spec.SecretRef.Namespace
- sName := sharedSecret.Spec.SecretRef.Name
- comboKey := objcache.BuildKey(sNamespace, sName)
- s := client.GetSecret(sNamespace, sName)
- if s != nil </span><span class="cov8" title="1">{
- payload := Payload{
- ByteData: s.Data,
- }
- upsertError := commonUpsertRanger(dv, comboKey, payload)
- if upsertError != nil </span><span class="cov0" title="0">{
- ProcessFileSystemError(s, upsertError)
- return upsertError
- }</span>
- }
- <span class="cov8" title="1">if dv.IsRefresh() </span><span class="cov8" title="1">{
- objcache.RegisterSecretUpsertCallback(dv.GetVolID(), comboKey, upsertRangerSec)
- }</span>
- <span class="cov8" title="1">deleteRangerSec := func(key, value interface{}) bool </span><span class="cov8" title="1">{
- return commonDeleteRanger(dv, key)
- }</span>
- //we should register delete callbacks regardless of any per volume refresh setting to account for removed permissions
- <span class="cov8" title="1">objcache.RegisterSecretDeleteCallback(dv.GetVolID(), deleteRangerSec)</span>
- default:<span class="cov0" title="0">
- return fmt.Errorf("invalid share backing resource kind %s", dv.GetSharedDataKind())</span>
- }
- <span class="cov8" title="1">return nil</span>
- }
- func (d *driver) mapVolumeToPod(dv *driverVolume) error <span class="cov8" title="1">{
- klog.V(4).Infof("mapVolumeToPod calling mapBackingResourceToPod")
- err := mapBackingResourceToPod(dv)
- if err != nil </span><span class="cov0" title="0">{
- return err
- }</span>
- <span class="cov8" title="1">d.registerRangers(dv)
- return nil</span>
- }
- func (d *driver) registerRangers(dv *driverVolume) <span class="cov8" title="1">{
- deleteRangerShare := func(key, value interface{}) bool </span><span class="cov8" title="1">{
- return shareDeleteRanger(key)
- }</span>
- <span class="cov8" title="1">updateRangerShare := func(key, value interface{}) bool </span><span class="cov8" title="1">{
- return shareUpdateRanger(key, value)
- }</span>
- <span class="cov8" title="1">switch dv.GetSharedDataKind() </span>{
- case consts.ResourceReferenceTypeSecret:<span class="cov8" title="1">
- objcache.RegisterSharedSecretUpdateCallback(dv.GetVolID(), dv.GetSharedDataId(), updateRangerShare)
- objcache.RegisteredSharedSecretDeleteCallback(dv.GetVolID(), deleteRangerShare)</span>
- case consts.ResourceReferenceTypeConfigMap:<span class="cov8" title="1">
- objcache.RegisterSharedConfigMapUpdateCallback(dv.GetVolID(), dv.GetSharedDataId(), updateRangerShare)
- objcache.RegisterSharedConfigMapDeleteCallback(dv.GetVolID(), deleteRangerShare)</span>
- }
- }
- // createVolume create the directory for the csidriver volume.
- // It returns the volume path or err if one occurs.
- func (d *driver) createVolume(volID, targetPath string, refresh bool, volCtx map[string]string, cmShare *sharev1alpha1.SharedConfigMap, sShare *sharev1alpha1.SharedSecret, cap int64, volAccessType accessType) (*driverVolume, error) <span class="cov8" title="1">{
- if cmShare != nil && sShare != nil </span><span class="cov0" title="0">{
- return nil, fmt.Errorf("cannot store both SharedConfigMap and SharedSecret in a volume")
- }</span>
- <span class="cov8" title="1">if cmShare == nil && sShare == nil </span><span class="cov0" title="0">{
- return nil, fmt.Errorf("have to provide either a SharedConfigMap or SharedSecret to a volume")
- }</span>
- <span class="cov8" title="1">dv := d.getVolume(volID)
- if dv != nil </span><span class="cov8" title="1">{
- klog.V(0).Infof("createVolume: create call came in for volume %s that we have already created; returning previously created instance", volID)
- return dv, nil
- }</span>
- <span class="cov8" title="1">anchorDir, bindDir := d.getVolumePath(volID, volCtx)
- switch volAccessType </span>{
- case mountAccess:<span class="cov8" title="1">
- err := os.MkdirAll(anchorDir, 0777)
- if err != nil </span><span class="cov0" title="0">{
- return nil, err
- }</span>
- default:<span class="cov8" title="1">
- return nil, fmt.Errorf("unsupported access type %v", volAccessType)</span>
- }
- <span class="cov8" title="1">podNamespace, podName, podUID, podSA := getPodDetails(volCtx)
- vol := CreateDV(volID)
- vol.SetVolSize(cap)
- vol.SetVolPathAnchorDir(anchorDir)
- vol.SetVolPathBindMountDir(bindDir)
- vol.SetVolAccessType(volAccessType)
- vol.SetTargetPath(targetPath)
- vol.SetPodNamespace(podNamespace)
- vol.SetPodName(podName)
- vol.SetPodUID(podUID)
- vol.SetPodSA(podSA)
- vol.SetRefresh(refresh)
- switch </span>{
- case cmShare != nil:<span class="cov8" title="1">
- vol.SetSharedDataKind(string(consts.ResourceReferenceTypeConfigMap))
- vol.SetSharedDataId(cmShare.Name)</span>
- case sShare != nil:<span class="cov8" title="1">
- vol.SetSharedDataKind(string(consts.ResourceReferenceTypeSecret))
- vol.SetSharedDataId(sShare.Name)</span>
- }
- <span class="cov8" title="1">return vol, nil</span>
- }
- func isDirEmpty(name string) (bool, error) <span class="cov0" title="0">{
- f, err := os.Open(name)
- if err != nil </span><span class="cov0" title="0">{
- klog.Warningf("error opening %s during empty check: %s", name, err.Error())
- return false, err
- }</span>
- <span class="cov0" title="0">defer f.Close()
- _, err = f.Readdirnames(1) // Or f.Readdir(1)
- if err == io.EOF </span><span class="cov0" title="0">{
- return true, nil
- }</span>
- <span class="cov0" title="0">return false, err</span> // Either not empty or error, suits both cases
- }
- func deleteIfEmpty(name string) <span class="cov0" title="0">{
- if empty, err := isDirEmpty(name); empty && err == nil </span><span class="cov0" title="0">{
- klog.V(4).Infof("deleteIfEmpty %s", name)
- err = os.RemoveAll(name)
- if err != nil </span><span class="cov0" title="0">{
- klog.Warningf("error deleting %s: %s", name, err.Error())
- }</span>
- }
- }
- func (d *driver) innerDeleteVolume(top string) <span class="cov0" title="0">{
- // reminder, path is filepath.Join(DataRoot, [anchor-dir | bind-dir], volID, podNamespace, podName, podUID, podSA)
- // delete SA dir
- klog.V(4).Infof("innerDeleteVolume %s", top)
- err := os.RemoveAll(top)
- if err != nil </span><span class="cov0" title="0">{
- klog.Warningf("error deleting %s: %s", top, err.Error())
- }</span>
- <span class="cov0" title="0">currentLocation := top
- // we deleteIfEmpty on the remaining 4 levels
- for i := 0; i < 4; i++ </span><span class="cov0" title="0">{
- parentDir := filepath.Dir(currentLocation)
- deleteIfEmpty(parentDir)
- currentLocation = parentDir
- }</span>
- }
- // deleteVolume deletes the directory for the csidriver volume.
- func (d *driver) deleteVolume(volID string) error <span class="cov8" title="1">{
- klog.V(4).Infof("deleting csidriver volume: %s", volID)
- if dv := d.getVolume(volID); dv != nil </span><span class="cov8" title="1">{
- klog.V(4).Infof("found volume: %s", volID)
- os.RemoveAll(dv.GetTargetPath())
- remV(volID)
- }</span>
- <span class="cov8" title="1">objcache.UnregisterSecretUpsertCallback(volID)
- objcache.UnregisterSecretDeleteCallback(volID)
- objcache.UnregisterConfigMapUpsertCallback(volID)
- objcache.UnregisterConfigMapDeleteCallback(volID)
- objcache.UnregisterSharedConfigMapDeleteCallback(volID)
- objcache.UnregisterSharedConfigMapUpdateCallback(volID)
- objcache.UnregisterSharedSecretDeleteCallback(volID)
- objcache.UnregsiterSharedSecretsUpdateCallback(volID)
- return nil</span>
- }
- func (d *driver) loadVolsFromDisk() error <span class="cov8" title="1">{
- klog.V(2).Infof("loadVolsFromDisk")
- defer klog.V(2).Infof("loadVolsFromDisk exit")
- return filepath.Walk(d.volMapRoot, func(path string, info os.FileInfo, err error) error </span><span class="cov8" title="1">{
- if info == nil </span><span class="cov0" title="0">{
- return nil
- }</span>
- <span class="cov8" title="1">if err != nil </span><span class="cov0" title="0">{
- // continue to next file
- return nil
- }</span>
- <span class="cov8" title="1">if info.IsDir() </span><span class="cov8" title="1">{
- return nil
- }</span>
- <span class="cov0" title="0">fileName := filepath.Join(d.volMapRoot, info.Name())
- dataFile, oerr := os.Open(fileName)
- if oerr != nil </span><span class="cov0" title="0">{
- klog.V(0).Infof("loadVolsFromDisk error opening file %s: %s", fileName, err.Error())
- // continue to next file
- return nil
- }</span>
- <span class="cov0" title="0">dataDecoder := json.NewDecoder(dataFile)
- dv := &driverVolume{}
- err = dataDecoder.Decode(dv)
- if err != nil </span><span class="cov0" title="0">{
- klog.V(0).Infof("loadVolsFromDisk error decoding file %s: %s", fileName, err.Error())
- // continue to next file
- return nil
- }</span>
- <span class="cov0" title="0">if dv == nil </span><span class="cov0" title="0">{
- klog.V(0).Infof("loadVolsFromDisk nil but no error for file %s", fileName)
- // continue to next file
- return nil
- }</span>
- <span class="cov0" title="0">dv.Lock = &sync.Mutex{}
- if filepath.Base(fileName) != dv.GetVolID() </span><span class="cov0" title="0">{
- klog.Warningf("loadVolsFromDisk file %s had vol id %s - corrupted !!!", dv.GetVolID())
- return nil
- }</span>
- <span class="cov0" title="0">klog.V(2).Infof("loadVolsFromDisk storing with key %s dv %#v", dv.GetVolID(), dv)
- setDPV(dv.GetVolID(), dv)
- d.registerRangers(dv)
- return nil</span>
- })
- }
- // Prune inspects all the volumes stored on disk and checks if their associated pods still exists. If not, the volume
- // file in question is deleted from disk.
- func (d *driver) Prune(kubeClient kubernetes.Interface) <span class="cov8" title="1">{
- filesToPrune := map[string]driverVolume{}
- filepath.Walk(d.volMapRoot, func(path string, info os.FileInfo, err error) error </span><span class="cov8" title="1">{
- if info == nil </span><span class="cov0" title="0">{
- return nil
- }</span>
- <span class="cov8" title="1">if err != nil </span><span class="cov0" title="0">{
- // continue to next file
- klog.V(5).Infof("Prune: for path %s given error %s", path, err.Error())
- return nil
- }</span>
- <span class="cov8" title="1">if info.IsDir() </span><span class="cov8" title="1">{
- return nil
- }</span>
- <span class="cov8" title="1">fileName := filepath.Join(d.volMapRoot, info.Name())
- dataFile, oerr := os.Open(fileName)
- if oerr != nil </span><span class="cov0" title="0">{
- klog.V(0).Infof("loadVolsFromDisk error opening file %s: %s", fileName, err.Error())
- // continue to next file
- return nil
- }</span>
- <span class="cov8" title="1">dataDecoder := json.NewDecoder(dataFile)
- dv := &driverVolume{}
- err = dataDecoder.Decode(dv)
- if err != nil </span><span class="cov0" title="0">{
- klog.V(0).Infof("loadVolsFromDisk error decoding file %s: %s", fileName, err.Error())
- // continue to next file
- return nil
- }</span>
- <span class="cov8" title="1">if dv == nil </span><span class="cov0" title="0">{
- klog.V(0).Infof("loadVolsFromDisk nil but no error for file %s", fileName)
- // continue to next file
- return nil
- }</span>
- <span class="cov8" title="1">dv.Lock = &sync.Mutex{}
- _, err = kubeClient.CoreV1().Pods(dv.GetPodNamespace()).Get(context.TODO(), dv.GetPodName(), metav1.GetOptions{})
- if err != nil && kerrors.IsNotFound(err) </span><span class="cov8" title="1">{
- klog.V(2).Infof("pruner: dv %q: %s", fileName, err.Error())
- filesToPrune[fileName] = *dv
- }</span>
- <span class="cov8" title="1">return nil</span>
- })
- <span class="cov8" title="1">if len(filesToPrune) == 0 </span><span class="cov0" title="0">{
- return
- }</span>
- // a bit paranoid, but not deleting files in the walk loop in case that can mess up filepath.Walk's iteration logic
- <span class="cov8" title="1">for file, dv := range filesToPrune </span><span class="cov8" title="1">{
- err := os.Remove(file)
- if err != nil </span><span class="cov0" title="0">{
- klog.Warningf("pruner: unable to prune file %q: %s", file, err.Error())
- continue</span>
- }
- <span class="cov8" title="1">klog.V(2).Infof("pruner: removed volume file %q with missing pod from disk", file)
- if d.mounter != nil </span><span class="cov8" title="1">{
- err = d.mounter.Unmount(dv.GetVolPathAnchorDir())
- if err != nil </span><span class="cov0" title="0">{
- klog.Warningf("pruner: issue unmounting for volume %s mount id %s: %s", dv.GetVolID(), dv.GetVolPathAnchorDir(), err.Error())
- }</span> else<span class="cov8" title="1"> {
- klog.V(2).Infof("pruner: successfully unmounted volume %s mount id %s", dv.GetVolID(), dv.GetVolPathAnchorDir())
- }</span>
- }
- }
- }
- </pre>
- <pre class="file" id="file5" style="display: none">package csidriver
- import (
- "fmt"
- corev1 "k8s.io/api/core/v1"
- "k8s.io/apimachinery/pkg/runtime"
- "k8s.io/klog/v2"
- "github.com/openshift/csi-driver-shared-resource/pkg/client"
- )
- type Payload struct {
- StringData map[string]string
- ByteData map[string][]byte
- }
- func ProcessFileSystemError(obj runtime.Object, err error) <span class="cov0" title="0">{
- msg := fmt.Sprintf("%s", err.Error())
- klog.Errorf(msg)
- client.GetRecorder().Eventf(obj, corev1.EventTypeWarning, "FileSystemError", msg)
- }</span>
- </pre>
- <pre class="file" id="file6" style="display: none">/*
- Copyright 2017 The Kubernetes Authors.
- Licensed under the Apache License, Version 2.0 (the "License");
- you may not use this file except in compliance with the License.
- You may obtain a copy of the License at
- http://www.apache.org/licenses/LICENSE-2.0
- Unless required by applicable law or agreed to in writing, software
- distributed under the License is distributed on an "AS IS" BASIS,
- WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- See the License for the specific language governing permissions and
- limitations under the License.
- */
- package csidriver
- import (
- "github.com/container-storage-interface/spec/lib/go/csi"
- "golang.org/x/net/context"
- "google.golang.org/grpc/codes"
- "google.golang.org/grpc/status"
- "k8s.io/klog/v2"
- )
- type identityServer struct {
- name string
- version string
- }
- func NewIdentityServer(name, version string) *identityServer <span class="cov0" title="0">{
- return &identityServer{
- name: name,
- version: version,
- }
- }</span>
- func (ids *identityServer) GetPluginInfo(ctx context.Context, req *csi.GetPluginInfoRequest) (*csi.GetPluginInfoResponse, error) <span class="cov0" title="0">{
- klog.V(5).Infof("Using default GetPluginInfo")
- if ids.name == "" </span><span class="cov0" title="0">{
- return nil, status.Error(codes.Unavailable, "Driver name not configured")
- }</span>
- <span class="cov0" title="0">if ids.version == "" </span><span class="cov0" title="0">{
- return nil, status.Error(codes.Unavailable, "Driver is missing version")
- }</span>
- <span class="cov0" title="0">return &csi.GetPluginInfoResponse{
- Name: ids.name,
- VendorVersion: ids.version,
- }, nil</span>
- }
- func (ids *identityServer) Probe(ctx context.Context, req *csi.ProbeRequest) (*csi.ProbeResponse, error) <span class="cov0" title="0">{
- return &csi.ProbeResponse{}, nil
- }</span>
- func (ids *identityServer) GetPluginCapabilities(ctx context.Context, req *csi.GetPluginCapabilitiesRequest) (*csi.GetPluginCapabilitiesResponse, error) <span class="cov0" title="0">{
- klog.V(5).Infof("Using default capabilities")
- return &csi.GetPluginCapabilitiesResponse{
- Capabilities: []*csi.PluginCapability{
- // Even with the use of a DaemonSet so that this plugin runs on every node, this plugin does not
- // guarantee that the *same* volume is present on all nodes; now, equivalent data could be present
- // on different nodes via different volumes, as a function of different pods residing on different
- // nodes have access to the same subset of shared configmpas/secrets, but that does not satisfy
- // the definition around csi.PluginCapability_Service_VOLUME_ACCESSIBILITY_CONSTRAINTS
- {
- Type: &csi.PluginCapability_Service_{
- Service: &csi.PluginCapability_Service{
- Type: csi.PluginCapability_Service_VOLUME_ACCESSIBILITY_CONSTRAINTS,
- },
- },
- },
- },
- }, nil
- }</span>
- </pre>
- <pre class="file" id="file7" style="display: none">package csidriver
- import (
- "fmt"
- "google.golang.org/grpc/codes"
- "google.golang.org/grpc/status"
- "k8s.io/utils/mount"
- )
- type FileSystemMounter interface {
- makeFSMounts(mountIDString, intermediateBindMountDir, kubeletTargetDir string, mounter mount.Interface) error
- removeFSMounts(mountIDString, intermediateBindMountDir, kubeletTargetDir string, mount mount.Interface) error
- }
- // ReadWriteMany high level details:
- //
- // This is our original landing spot wrt mounting the file system this driver manipulates
- // to where the location the kubelet has allocated for the CSI volume in question.
- //
- // We go straight from our "identifier" string based on input from Jan to the kubelet's target directory. No bind mounts.
- // But this approach only works if the K8s CSIVolumenSource set readOnly to false. If readOnly
- // is set to true, the underlying mount mechanics between our call to m.mounter.Mount and what
- // the kubelet does for the Pod results in the use of xfs for the filesystem and an inability for the
- // Pod to read what we have mounted.
- //
- // Additional details:
- //
- // So our intent here is to have a separate tmpfs per pod; through experimentation
- // and corroboration with OpenShift storage SMEs, a separate tmpfs per pod
- // - ensures the kubelet will handle SELinux for us. It will relabel the volume in "the right way" just for the pod
- // - otherwise, if pods share the same host dir, all sorts of warnings from the SMEs
- // - and the obvious isolation between pods that implies
- // We cannot do read-only on the mount since we have to copy the data after the mount, otherwise we get errors
- // that the filesystem is readonly.
- // However, we can restart this driver, leave up any live Pods with our volume, and then still update the content
- // after this driver comes backup.
- // The various bits that work in concert to achieve this
- // - the use of emptyDir with a medium of Memory in this drivers Deployment is all that is needed to get tmpfs
- // - do not use the "bind" option, that reuses existing dirs/filesystems vs. creating new tmpfs
- // - without bind, we have to specify an fstype of tmpfs and path for the mount source, or we get errors on the
- // Mount about the fs not being block access
- // - that said, testing confirmed using fstype of tmpfs on hostpath/xfs volumes still results in the target
- // being xfs and not tmpfs
- // - with the lack of a bind option, and each pod getting its own tmpfs we have to copy the data from our emptydir
- // based location to the targetPath here ... that is handled in driver.go
- type ReadWriteMany struct {
- }
- func (m *ReadWriteMany) makeFSMounts(mountIDString, intermediateBindMountDir, kubeletTargetDir string, mounter mount.Interface) error <span class="cov8" title="1">{
- options := []string{}
- if err := mounter.Mount(mountIDString, kubeletTargetDir, "tmpfs", options); err != nil </span><span class="cov0" title="0">{
- return status.Error(codes.Internal, fmt.Sprintf("failed to mount device: %s at %s: %s",
- mountIDString,
- kubeletTargetDir,
- err.Error()))
- }</span>
- <span class="cov8" title="1">return nil</span>
- }
- func (m *ReadWriteMany) removeFSMounts(mountIDString, intermediateBindMountDir, kubeletTargetDir string, mounter mount.Interface) error <span class="cov0" title="0">{
- // mount.CleanupMountPoint proved insufficient for us, as it always considered our mountIDString here "not a mount", even
- // though we would rsh into the driver container/pod and manually run 'umount'. If we did not do this, then
- // the termination of pods using our CSI driver could hang. So we just directly call Unmount from out mounter.
- if err := mounter.Unmount(mountIDString); err != nil </span><span class="cov0" title="0">{
- return status.Error(codes.Internal, fmt.Sprintf("failed to umount device: %s at %s and %s: %s",
- mountIDString,
- intermediateBindMountDir,
- kubeletTargetDir,
- err.Error()))
- }</span>
- <span class="cov0" title="0">return nil</span>
- }
- </pre>
- <pre class="file" id="file8" style="display: none">/*
- Copyright 2017 The Kubernetes Authors.
- Licensed under the Apache License, Version 2.0 (the "License");
- you may not use this file except in compliance with the License.
- You may obtain a copy of the License at
- http://www.apache.org/licenses/LICENSE-2.0
- Unless required by applicable law or agreed to in writing, software
- distributed under the License is distributed on an "AS IS" BASIS,
- WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- See the License for the specific language governing permissions and
- limitations under the License.
- */
- package csidriver
- import (
- "fmt"
- "os"
- "path/filepath"
- "strconv"
- "strings"
- "github.com/container-storage-interface/spec/lib/go/csi"
- sharev1alpha1 "github.com/openshift/api/sharedresource/v1alpha1"
- "github.com/openshift/csi-driver-shared-resource/pkg/client"
- "github.com/openshift/csi-driver-shared-resource/pkg/consts"
- "github.com/openshift/csi-driver-shared-resource/pkg/metrics"
- "golang.org/x/net/context"
- "google.golang.org/grpc/codes"
- "google.golang.org/grpc/status"
- "k8s.io/klog/v2"
- "k8s.io/utils/mount"
- )
- var (
- listers client.Listers
- )
- type nodeServer struct {
- nodeID string
- maxVolumesPerNode int64
- d CSIDriver
- readWriteMounter FileSystemMounter
- mounter mount.Interface
- }
- func NewNodeServer(d *driver) *nodeServer <span class="cov0" title="0">{
- return &nodeServer{
- nodeID: d.nodeID,
- maxVolumesPerNode: d.maxVolumesPerNode,
- d: d,
- mounter: mount.New(""),
- readWriteMounter: &ReadWriteMany{},
- }
- }</span>
- func getPodDetails(volumeContext map[string]string) (string, string, string, string) <span class="cov8" title="1">{
- podName, _ := volumeContext[CSIPodName]
- podNamespace, _ := volumeContext[CSIPodNamespace]
- podSA, _ := volumeContext[CSIPodSA]
- podUID, _ := volumeContext[CSIPodUID]
- return podNamespace, podName, podUID, podSA
- }</span>
- func (ns *nodeServer) validateShare(req *csi.NodePublishVolumeRequest) (*sharev1alpha1.SharedConfigMap, *sharev1alpha1.SharedSecret, error) <span class="cov8" title="1">{
- configMapShareName, cmok := req.GetVolumeContext()[SharedConfigMapShareKey]
- secretShareName, sok := req.GetVolumeContext()[SharedSecretShareKey]
- if (!cmok && !sok) || (len(strings.TrimSpace(configMapShareName)) == 0 && len(strings.TrimSpace(secretShareName)) == 0) </span><span class="cov8" title="1">{
- return nil, nil, status.Errorf(codes.InvalidArgument,
- "the csi driver reference is missing the volumeAttribute %q and %q", SharedSecretShareKey, SharedConfigMapShareKey)
- }</span>
- <span class="cov8" title="1">if (cmok && sok) || (len(strings.TrimSpace(configMapShareName)) > 0 && len(strings.TrimSpace(secretShareName)) > 0) </span><span class="cov8" title="1">{
- return nil, nil, status.Errorf(codes.InvalidArgument,
- "a single volume cannot support both a SharedConfigMap reference %q and SharedSecret reference %q",
- configMapShareName, secretShareName)
- }</span>
- <span class="cov8" title="1">var cmShare *sharev1alpha1.SharedConfigMap
- var sShare *sharev1alpha1.SharedSecret
- var err error
- allowed := false
- if len(configMapShareName) > 0 </span><span class="cov8" title="1">{
- cmShare, err = client.GetListers().SharedConfigMaps.Get(configMapShareName)
- if err != nil </span><span class="cov0" title="0">{
- return nil, nil, status.Errorf(codes.InvalidArgument,
- "the csi driver volumeAttribute %q reference had an error: %s", configMapShareName, err.Error())
- }</span>
- }
- <span class="cov8" title="1">if len(secretShareName) > 0 </span><span class="cov8" title="1">{
- sShare, err = client.GetListers().SharedSecrets.Get(secretShareName)
- if err != nil </span><span class="cov0" title="0">{
- return nil, nil, status.Errorf(codes.InvalidArgument,
- "the csi driver volumeAttribute %q reference had an error: %s", secretShareName, err.Error())
- }</span>
- }
- <span class="cov8" title="1">if sShare == nil && cmShare == nil </span><span class="cov0" title="0">{
- return nil, nil, status.Errorf(codes.InvalidArgument,
- "volumeAttributes did not reference a valid SharedSecret or SharedConfigMap")
- }</span>
- <span class="cov8" title="1">podNamespace, podName, _, podSA := getPodDetails(req.GetVolumeContext())
- shareName := ""
- kind := consts.ResourceReferenceTypeConfigMap
- if cmShare != nil </span><span class="cov8" title="1">{
- if len(strings.TrimSpace(cmShare.Spec.ConfigMapRef.Namespace)) == 0 </span><span class="cov8" title="1">{
- return nil, nil, status.Errorf(codes.InvalidArgument,
- "the SharedConfigMap %q backing resource namespace needs to be set", configMapShareName)
- }</span>
- <span class="cov8" title="1">if len(strings.TrimSpace(cmShare.Spec.ConfigMapRef.Name)) == 0 </span><span class="cov8" title="1">{
- return nil, nil, status.Errorf(codes.InvalidArgument,
- "the SharedConfigMap %q backing resource name needs to be set", configMapShareName)
- }</span>
- <span class="cov8" title="1">shareName = configMapShareName</span>
- }
- <span class="cov8" title="1">if sShare != nil </span><span class="cov8" title="1">{
- kind = consts.ResourceReferenceTypeSecret
- if len(strings.TrimSpace(sShare.Spec.SecretRef.Namespace)) == 0 </span><span class="cov8" title="1">{
- return nil, nil, status.Errorf(codes.InvalidArgument,
- "the SharedSecret %q backing resource namespace needs to be set", secretShareName)
- }</span>
- <span class="cov8" title="1">if len(strings.TrimSpace(sShare.Spec.SecretRef.Name)) == 0 </span><span class="cov8" title="1">{
- return nil, nil, status.Errorf(codes.InvalidArgument,
- "the SharedSecret %q backing resource name needs to be set", secretShareName)
- }</span>
- <span class="cov8" title="1">shareName = secretShareName</span>
- }
- <span class="cov8" title="1">allowed, err = client.ExecuteSAR(shareName, podNamespace, podName, podSA, kind)
- if allowed </span><span class="cov8" title="1">{
- return cmShare, sShare, nil
- }</span>
- <span class="cov8" title="1">return nil, nil, err</span>
- }
- // validateVolumeContext return values:
- func (ns *nodeServer) validateVolumeContext(req *csi.NodePublishVolumeRequest) error <span class="cov8" title="1">{
- podNamespace, podName, podUID, podSA := getPodDetails(req.GetVolumeContext())
- klog.V(4).Infof("NodePublishVolume pod %s ns %s sa %s uid %s",
- podName,
- podNamespace,
- podSA,
- podUID)
- if len(podName) == 0 || len(podNamespace) == 0 || len(podUID) == 0 || len(podSA) == 0 </span><span class="cov8" title="1">{
- return status.Error(codes.InvalidArgument,
- fmt.Sprintf("Volume attributes missing required set for pod: namespace: %s name: %s uid: %s, sa: %s",
- podNamespace, podName, podUID, podSA))
- }</span>
- <span class="cov8" title="1">ephemeralVolume := req.GetVolumeContext()[CSIEphemeral] == "true" ||
- req.GetVolumeContext()[CSIEphemeral] == "" // Kubernetes 1.15 doesn't have csi.storage.k8s.io/ephemeral.
- if !ephemeralVolume </span><span class="cov8" title="1">{
- return status.Error(codes.InvalidArgument, "Non-ephemeral request made")
- }</span>
- <span class="cov8" title="1">if req.GetVolumeCapability().GetMount() == nil </span><span class="cov8" title="1">{
- return status.Error(codes.InvalidArgument, "only support mount access type")
- }</span>
- <span class="cov8" title="1">return nil</span>
- }
- func (ns *nodeServer) NodePublishVolume(ctx context.Context, req *csi.NodePublishVolumeRequest) (*csi.NodePublishVolumeResponse, error) <span class="cov8" title="1">{
- var kubeletTargetPath string
- // Check arguments
- if req.GetVolumeCapability() == nil </span><span class="cov8" title="1">{
- return nil, status.Error(codes.InvalidArgument, "Volume capability missing in request")
- }</span>
- <span class="cov8" title="1">if len(req.GetVolumeId()) == 0 </span><span class="cov8" title="1">{
- return nil, status.Error(codes.InvalidArgument, "Volume ID missing in request")
- }</span>
- <span class="cov8" title="1">if len(req.GetTargetPath()) == 0 </span><span class="cov8" title="1">{
- return nil, status.Error(codes.InvalidArgument, "Target path missing in request")
- }</span>
- <span class="cov8" title="1">if req.VolumeContext == nil || len(req.GetVolumeContext()) == 0 </span><span class="cov8" title="1">{
- return nil, status.Error(codes.InvalidArgument, "Volume attributes missing in request")
- }</span>
- <span class="cov8" title="1">err := ns.validateVolumeContext(req)
- if err != nil </span><span class="cov8" title="1">{
- return nil, err
- }</span>
- <span class="cov8" title="1">cmShare, sShare, err := ns.validateShare(req)
- if err != nil </span><span class="cov8" title="1">{
- return nil, err
- }</span>
- <span class="cov8" title="1">kubeletTargetPath = req.GetTargetPath()
- if !req.GetReadonly() </span><span class="cov8" title="1">{
- return nil, status.Error(codes.InvalidArgument, "The Shared Resource CSI driver requires all volume requests to set read-only to 'true'")
- }</span>
- <span class="cov8" title="1">attrib := req.GetVolumeContext()
- refresh := true
- refreshStr, rok := attrib[RefreshResource]
- if rok </span><span class="cov8" title="1">{
- r, e := strconv.ParseBool(refreshStr)
- if e == nil </span><span class="cov8" title="1">{
- refresh = r
- }</span>
- }
- <span class="cov8" title="1">vol, err := ns.d.createVolume(req.GetVolumeId(), kubeletTargetPath, refresh, req.GetVolumeContext(), cmShare, sShare, maxStorageCapacity, mountAccess)
- if err != nil && !os.IsExist(err) </span><span class="cov0" title="0">{
- klog.Error("ephemeral mode failed to create volume: ", err)
- return nil, status.Error(codes.Internal, err.Error())
- }</span>
- <span class="cov8" title="1">klog.V(4).Infof("NodePublishVolume created volume: %s", kubeletTargetPath)
- notMnt, err := mount.IsNotMountPoint(ns.mounter, kubeletTargetPath)
- if err != nil </span><span class="cov0" title="0">{
- if os.IsNotExist(err) </span><span class="cov0" title="0">{
- if err = os.MkdirAll(kubeletTargetPath, 0750); err != nil </span><span class="cov0" title="0">{
- return nil, status.Error(codes.Internal, err.Error())
- }</span>
- <span class="cov0" title="0">notMnt = true</span>
- } else<span class="cov0" title="0"> {
- return nil, status.Error(codes.Internal, err.Error())
- }</span>
- }
- // this means the mount.Mounter call has already happened
- <span class="cov8" title="1">if !notMnt </span><span class="cov0" title="0">{
- return &csi.NodePublishVolumeResponse{}, nil
- }</span>
- <span class="cov8" title="1">fsType := req.GetVolumeCapability().GetMount().GetFsType()
- deviceId := ""
- if req.GetPublishContext() != nil </span><span class="cov0" title="0">{
- deviceId = req.GetPublishContext()[deviceID]
- }</span>
- <span class="cov8" title="1">volumeId := req.GetVolumeId()
- mountFlags := req.GetVolumeCapability().GetMount().GetMountFlags()
- klog.V(4).Infof("NodePublishVolume %v\nfstype %v\ndevice %v\nvolumeId %v\nattributes %v\nmountflags %v\n",
- kubeletTargetPath, fsType, deviceId, volumeId, attrib, mountFlags)
- mountIDString, bindDir := ns.d.getVolumePath(req.GetVolumeId(), req.GetVolumeContext())
- if err := ns.readWriteMounter.makeFSMounts(mountIDString, bindDir, kubeletTargetPath, ns.mounter); err != nil </span><span class="cov0" title="0">{
- return nil, err
- }</span>
- // here is what initiates that necessary copy now with *NOT* using bind on the mount so each pod gets its own tmpfs
- <span class="cov8" title="1">if err := ns.d.mapVolumeToPod(vol); err != nil </span><span class="cov0" title="0">{
- metrics.IncMountCounters(false)
- return nil, status.Error(codes.Internal, fmt.Sprintf("failed to populate mount device: %s at %s: %s",
- bindDir,
- kubeletTargetPath,
- err.Error()))
- }</span>
- <span class="cov8" title="1">if err := vol.StoreToDisk(ns.d.GetVolMapRoot()); err != nil </span><span class="cov0" title="0">{
- metrics.IncMountCounters(false)
- klog.Errorf("failed to persist driver volume metadata to disk: %s", err.Error())
- return nil, status.Error(codes.Internal, err.Error())
- }</span>
- <span class="cov8" title="1">metrics.IncMountCounters(true)
- return &csi.NodePublishVolumeResponse{}, nil</span>
- }
- func (ns *nodeServer) NodeUnpublishVolume(ctx context.Context, req *csi.NodeUnpublishVolumeRequest) (*csi.NodeUnpublishVolumeResponse, error) <span class="cov0" title="0">{
- // Check arguments
- if len(req.GetVolumeId()) == 0 </span><span class="cov0" title="0">{
- return nil, status.Error(codes.InvalidArgument, "Volume ID missing in request")
- }</span>
- <span class="cov0" title="0">if len(req.GetTargetPath()) == 0 </span><span class="cov0" title="0">{
- return nil, status.Error(codes.InvalidArgument, "Target path missing in request")
- }</span>
- <span class="cov0" title="0">targetPath := req.GetTargetPath()
- volumeID := req.GetVolumeId()
- dv := ns.d.getVolume(volumeID)
- if dv == nil </span><span class="cov0" title="0">{
- return nil, status.Error(codes.Internal, fmt.Sprintf("unpublish volume %s already gone", volumeID))
- }</span>
- <span class="cov0" title="0">if err := ns.readWriteMounter.removeFSMounts(dv.GetVolPathAnchorDir(), dv.GetVolPathBindMountDir(), targetPath, ns.mounter); err != nil </span><span class="cov0" title="0">{
- return nil, status.Error(codes.Internal, fmt.Sprintf("error removing %s: %s", targetPath, err.Error()))
- }</span>
- <span class="cov0" title="0">klog.V(4).Infof("volume %s at path %s has been unpublished.", volumeID, targetPath)
- if err := ns.d.deleteVolume(volumeID); err != nil && !os.IsNotExist(err) </span><span class="cov0" title="0">{
- return nil, status.Error(codes.Internal, fmt.Sprintf("failed to delete volume: %s", err))
- }</span>
- <span class="cov0" title="0">filePath := filepath.Join(ns.d.GetVolMapRoot(), dv.GetVolID())
- if err := os.Remove(filePath); err != nil </span><span class="cov0" title="0">{
- klog.Errorf("failed to persist driver volume metadata to disk: %s", err.Error())
- return nil, status.Error(codes.Internal, err.Error())
- }</span>
- <span class="cov0" title="0">return &csi.NodeUnpublishVolumeResponse{}, nil</span>
- }
- func (ns *nodeServer) NodeStageVolume(ctx context.Context, req *csi.NodeStageVolumeRequest) (*csi.NodeStageVolumeResponse, error) <span class="cov0" title="0">{
- return nil, status.Error(codes.Unimplemented, "")
- }</span>
- func (ns *nodeServer) NodeUnstageVolume(ctx context.Context, req *csi.NodeUnstageVolumeRequest) (*csi.NodeUnstageVolumeResponse, error) <span class="cov0" title="0">{
- return nil, status.Error(codes.Unimplemented, "")
- }</span>
- func (ns *nodeServer) NodeGetInfo(ctx context.Context, req *csi.NodeGetInfoRequest) (*csi.NodeGetInfoResponse, error) <span class="cov0" title="0">{
- topology := &csi.Topology{
- Segments: map[string]string{TopologyKeyNode: ns.nodeID},
- }
- return &csi.NodeGetInfoResponse{
- NodeId: ns.nodeID,
- MaxVolumesPerNode: ns.maxVolumesPerNode,
- AccessibleTopology: topology,
- }, nil
- }</span>
- func (ns *nodeServer) NodeGetCapabilities(ctx context.Context, req *csi.NodeGetCapabilitiesRequest) (*csi.NodeGetCapabilitiesResponse, error) <span class="cov0" title="0">{
- return &csi.NodeGetCapabilitiesResponse{
- Capabilities: []*csi.NodeServiceCapability{},
- }, nil
- }</span>
- func (ns *nodeServer) NodeGetVolumeStats(ctx context.Context, in *csi.NodeGetVolumeStatsRequest) (*csi.NodeGetVolumeStatsResponse, error) <span class="cov0" title="0">{
- return nil, status.Error(codes.Unimplemented, "")
- }</span>
- // NodeExpandVolume is only implemented so the driver can be used for e2e testing.
- func (ns *nodeServer) NodeExpandVolume(ctx context.Context, req *csi.NodeExpandVolumeRequest) (*csi.NodeExpandVolumeResponse, error) <span class="cov0" title="0">{
- return nil, status.Error(codes.Unimplemented, "")
- }</span>
- </pre>
- <pre class="file" id="file9" style="display: none">/*
- Copyright 2019 The Kubernetes Authors.
- Licensed under the Apache License, Version 2.0 (the "License");
- you may not use this file except in compliance with the License.
- You may obtain a copy of the License at
- http://www.apache.org/licenses/LICENSE-2.0
- Unless required by applicable law or agreed to in writing, software
- distributed under the License is distributed on an "AS IS" BASIS,
- WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- See the License for the specific language governing permissions and
- limitations under the License.
- */
- package csidriver
- import (
- "fmt"
- "net"
- "os"
- "strings"
- "sync"
- "golang.org/x/net/context"
- "google.golang.org/grpc"
- "github.com/container-storage-interface/spec/lib/go/csi"
- "github.com/kubernetes-csi/csi-lib-utils/protosanitizer"
- "k8s.io/klog/v2"
- )
- func NewNonBlockingGRPCServer() *nonBlockingGRPCServer <span class="cov0" title="0">{
- return &nonBlockingGRPCServer{}
- }</span>
- // NonBlocking server
- type nonBlockingGRPCServer struct {
- wg sync.WaitGroup
- server *grpc.Server
- }
- func (s *nonBlockingGRPCServer) Start(endpoint string, ids csi.IdentityServer, ns csi.NodeServer) <span class="cov0" title="0">{
- s.wg.Add(1)
- go s.serve(endpoint, ids, ns)
- return
- }</span>
- func (s *nonBlockingGRPCServer) Wait() <span class="cov0" title="0">{
- s.wg.Wait()
- }</span>
- func (s *nonBlockingGRPCServer) Stop() <span class="cov0" title="0">{
- s.server.GracefulStop()
- }</span>
- func (s *nonBlockingGRPCServer) ForceStop() <span class="cov0" title="0">{
- s.server.Stop()
- }</span>
- func (s *nonBlockingGRPCServer) serve(endpoint string, ids csi.IdentityServer, ns csi.NodeServer) <span class="cov0" title="0">{
- proto, addr, err := parseEndpoint(endpoint)
- if err != nil </span><span class="cov0" title="0">{
- klog.Fatal(err.Error())
- }</span>
- <span class="cov0" title="0">if proto == "unix" </span><span class="cov0" title="0">{
- addr = "/" + addr
- if err := os.Remove(addr); err != nil && !os.IsNotExist(err) </span><span class="cov0" title="0">{ //nolint: vetshadow
- klog.Fatalf("Failed to remove %s, error: %s", addr, err.Error())
- }</span>
- }
- <span class="cov0" title="0">listener, err := net.Listen(proto, addr)
- if err != nil </span><span class="cov0" title="0">{
- klog.Fatalf("Failed to listen: %v", err)
- }</span>
- <span class="cov0" title="0">opts := []grpc.ServerOption{
- grpc.UnaryInterceptor(logGRPC),
- }
- server := grpc.NewServer(opts...)
- s.server = server
- if ids != nil </span><span class="cov0" title="0">{
- csi.RegisterIdentityServer(server, ids)
- }</span>
- <span class="cov0" title="0">if ns != nil </span><span class="cov0" title="0">{
- csi.RegisterNodeServer(server, ns)
- }</span>
- <span class="cov0" title="0">klog.Infof("Listening for connections on address: %#v", listener.Addr())
- server.Serve(listener)</span>
- }
- func parseEndpoint(ep string) (string, string, error) <span class="cov0" title="0">{
- if strings.HasPrefix(strings.ToLower(ep), "unix://") || strings.HasPrefix(strings.ToLower(ep), "tcp://") </span><span class="cov0" title="0">{
- s := strings.SplitN(ep, "://", 2)
- if s[1] != "" </span><span class="cov0" title="0">{
- return s[0], s[1], nil
- }</span>
- }
- <span class="cov0" title="0">return "", "", fmt.Errorf("Invalid endpoint: %v", ep)</span>
- }
- func logGRPC(ctx context.Context, req interface{}, info *grpc.UnaryServerInfo, handler grpc.UnaryHandler) (interface{}, error) <span class="cov0" title="0">{
- klog.V(3).Infof("GRPC call: %s", info.FullMethod)
- klog.V(5).Infof("GRPC request: %+v", protosanitizer.StripSecrets(req))
- resp, err := handler(ctx, req)
- if err != nil </span><span class="cov0" title="0">{
- klog.Errorf("GRPC error: %v", err)
- }</span> else<span class="cov0" title="0"> {
- klog.V(5).Infof("GRPC response: %+v", protosanitizer.StripSecrets(resp))
- }</span>
- <span class="cov0" title="0">return resp, err</span>
- }
- </pre>
- <pre class="file" id="file10" style="display: none">package metrics
- import (
- "github.com/prometheus/client_golang/prometheus"
- )
- const (
- separator = "_"
- sharesSubsystem = "openshift_csi_share"
- mount = "mount"
- mountCountName = sharesSubsystem + separator + mount + separator + "requests_total"
- mountFailureCountName = sharesSubsystem + separator + mount + separator + "failures_total"
- MetricsPort = 6000
- )
- var (
- mountCounter, failedMountCounter = createMountCounters()
- )
- func createMountCounters() (prometheus.Counter, prometheus.Counter) <span class="cov8" title="1">{
- return prometheus.NewCounter(prometheus.CounterOpts{
- Name: mountCountName,
- Help: "Counts share volume mount attempts.",
- }),
- prometheus.NewCounter(prometheus.CounterOpts{
- Name: mountFailureCountName,
- Help: "Counts failed share volume mount attempts.",
- })
- }</span>
- func init() <span class="cov8" title="1">{
- prometheus.MustRegister(mountCounter)
- prometheus.MustRegister(failedMountCounter)
- }</span>
- func IncMountCounters(succeeded bool) <span class="cov8" title="1">{
- if !succeeded </span><span class="cov8" title="1">{
- failedMountCounter.Inc()
- }</span>
- <span class="cov8" title="1">mountCounter.Inc()</span>
- }
- </pre>
- <pre class="file" id="file11" style="display: none">package metrics
- import (
- "context"
- "errors"
- "fmt"
- "net/http"
- "time"
- "github.com/prometheus/client_golang/prometheus/promhttp"
- "k8s.io/klog/v2"
- )
- var (
- // these files are mounted from the openshift secret
- // shared-resource-csi-driver-node-metrics-serving-cert
- // by the csi-driver-shared-resource-operator
- tlsCRT = "/etc/secrets/tls.crt"
- tlsKey = "/etc/secrets/tls.key"
- )
- // BuildServer creates the http.Server struct
- func BuildServer(port int) (*http.Server, error) <span class="cov8" title="1">{
- if port <= 0 </span><span class="cov0" title="0">{
- klog.Error("invalid port for metric server")
- return nil, errors.New("invalid port for metrics server")
- }</span>
- <span class="cov8" title="1">bindAddr := fmt.Sprintf(":%d", port)
- router := http.NewServeMux()
- router.Handle("/metrics", promhttp.Handler())
- srv := &http.Server{
- Addr: bindAddr,
- Handler: router,
- }
- return srv, nil</span>
- }
- // StopServer stops the metrics server
- func StopServer(srv *http.Server) <span class="cov0" title="0">{
- ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
- defer cancel()
- if err := srv.Shutdown(ctx); err != nil </span><span class="cov0" title="0">{
- klog.Warningf("Problem shutting down HTTP server: %v", err)
- }</span>
- }
- // RunServer starts the metrics server.
- func RunServer(srv *http.Server, stopCh <-chan struct{}) <span class="cov8" title="1">{
- go func() </span><span class="cov8" title="1">{
- err := srv.ListenAndServeTLS(tlsCRT, tlsKey)
- if err != nil && err != http.ErrServerClosed </span><span class="cov0" title="0">{
- klog.Errorf("error starting metrics server: %v", err)
- }</span>
- }()
- <span class="cov8" title="1"><-stopCh
- if err := srv.Close(); err != nil </span><span class="cov0" title="0">{
- klog.Errorf("error closing metrics server: %v", err)
- }</span>
- }
- </pre>
- </div>
- </body>
- <script>
- (function() {
- var files = document.getElementById('files');
- var visible;
- files.addEventListener('change', onChange, false);
- function select(part) {
- if (visible)
- visible.style.display = 'none';
- visible = document.getElementById(part);
- if (!visible)
- return;
- files.value = part;
- visible.style.display = 'block';
- location.hash = part;
- }
- function onChange() {
- select(files.value);
- window.scrollTo(0, 0);
- }
- if (location.hash != "") {
- select(location.hash.substr(1));
- }
- if (!visible) {
- select("file0");
- }
- })();
- </script>
- </html>
Add Comment
Please, Sign In to add comment