-
Notifications
You must be signed in to change notification settings - Fork 12
Large postgres logs #105
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Large postgres logs #105
Changes from 5 commits
461c0ab
f9dbf48
478c36e
bb86785
bd07d21
ec94cfa
87d1296
cfe24ff
a6539fb
017b571
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -23,6 +23,8 @@ import ( | |
"io" | ||
"os" | ||
"os/exec" | ||
"path/filepath" | ||
"strconv" | ||
"strings" | ||
"text/tabwriter" | ||
"time" | ||
|
@@ -47,6 +49,7 @@ import ( | |
"k8s.io/client-go/dynamic" | ||
"k8s.io/client-go/kubernetes" | ||
"k8s.io/client-go/rest" | ||
"k8s.io/client-go/tools/remotecommand" | ||
"sigs.k8s.io/yaml" | ||
|
||
"github.com/crunchydata/postgres-operator-client/internal" | ||
|
@@ -449,7 +452,8 @@ Collecting PGO CLI logs... | |
// All Postgres Logs on the Postgres Instances (primary and replicas) | ||
if numLogs > 0 { | ||
if err == nil { | ||
err = gatherPostgresLogsAndConfigs(ctx, clientset, restConfig, namespace, clusterName, numLogs, tw, cmd) | ||
err = gatherPostgresLogsAndConfigs(ctx, clientset, restConfig, | ||
namespace, clusterName, outputDir, outputFile, numLogs, tw, cmd) | ||
} | ||
} | ||
|
||
|
@@ -955,6 +959,8 @@ func gatherPostgresLogsAndConfigs(ctx context.Context, | |
config *rest.Config, | ||
namespace string, | ||
clusterName string, | ||
outputDir string, | ||
outputFile string, | ||
numLogs int, | ||
tw *tar.Writer, | ||
cmd *cobra.Command, | ||
|
@@ -1024,30 +1030,56 @@ func gatherPostgresLogsAndConfigs(ctx context.Context, | |
} | ||
|
||
logFiles := strings.Split(strings.TrimSpace(stdout), "\n") | ||
for _, logFile := range logFiles { | ||
writeDebug(cmd, fmt.Sprintf("LOG FILE: %s\n", logFile)) | ||
var buf bytes.Buffer | ||
|
||
stdout, stderr, err := Executor(exec).catFile(logFile) | ||
// localDirectory is created to save data on disk | ||
// e.g. outputDir/crunchy_k8s_support_export_2022-08-08-115726-0400/remotePath | ||
localDirectory := filepath.Join(outputDir, strings.ReplaceAll(outputFile, ".tar.gz", "")) | ||
|
||
// flag to determine whether or not to remove localDirectory after the loop | ||
// When an error happens, this flag will switch to false | ||
// It's nice to have the extra data around when errors have happened | ||
doCleanup := true | ||
|
||
for _, logFile := range logFiles { | ||
// get the file size to stream | ||
fileSize, err := getRemoteFileSize(config, namespace, pod.Name, util.ContainerDatabase, logFile) | ||
if err != nil { | ||
if apierrors.IsForbidden(err) { | ||
writeInfo(cmd, err.Error()) | ||
// Continue and output errors for each log file | ||
// Allow the user to see and address all issues at once | ||
continue | ||
} | ||
return err | ||
writeDebug(cmd, fmt.Sprintf("could not get file size for %s: %v\n", logFile, err)) | ||
continue | ||
} | ||
|
||
buf.Write([]byte(stdout)) | ||
if stderr != "" { | ||
str := fmt.Sprintf("\nError returned: %s\n", stderr) | ||
buf.Write([]byte(str)) | ||
// fileSpecSrc is namespace/podname:path/to/file | ||
// fileSpecDest is the local destination of the file | ||
// These are used to help the user grab the file manually when necessary | ||
fileSpecSrc := fmt.Sprintf("%s/%s:%s", namespace, pod.Name, logFile) | ||
fileSpecDest := filepath.Join(localDirectory, logFile) | ||
writeInfo(cmd, fmt.Sprintf("\tSize of %-85s %v", fileSpecSrc, ConvertBytes(fileSize))) | ||
|
||
// Stream the file to disk and write the local file to the tar | ||
err = streamFileFromPod(clientset, config, tw, | ||
localDirectory, clusterName, namespace, pod.Name, util.ContainerDatabase, logFile, fileSize) | ||
|
||
if err != nil { | ||
doCleanup = false // prevent the deletion of localDirectory so a user can examine contents | ||
writeInfo(cmd, fmt.Sprintf("\tError streaming file %s: %v", logFile, err)) | ||
writeInfo(cmd, fmt.Sprintf("\tCollect manually with kubectl cp -c %s %s %s", | ||
util.ContainerDatabase, fileSpecSrc, fileSpecDest)) | ||
writeInfo(cmd, fmt.Sprintf("\tRemove %s manually after gathering necessary information", localDirectory)) | ||
continue | ||
} | ||
|
||
path := clusterName + fmt.Sprintf("/pods/%s/", pod.Name) + logFile | ||
if err := writeTar(tw, buf.Bytes(), path, cmd); err != nil { | ||
return err | ||
} | ||
|
||
// doCleanup is true when there are no errors above. | ||
if doCleanup { | ||
// Remove the local directory created to hold the data | ||
// Errors in removing localDirectory should instruct the user to remove manually. | ||
// This happens often on Windows | ||
err = os.RemoveAll(localDirectory) | ||
if err != nil { | ||
writeInfo(cmd, fmt.Sprintf("\tError removing %s: %v", localDirectory, err)) | ||
writeInfo(cmd, fmt.Sprintf("\tYou may need to remove %s manually", localDirectory)) | ||
continue | ||
} | ||
} | ||
|
||
|
@@ -1800,3 +1832,164 @@ func writeDebug(cmd *cobra.Command, s string) { | |
t := time.Now() | ||
cmd.Printf("%s - DEBUG - %s", t.Format(logTimeFormat), s) | ||
} | ||
|
||
// streamFileFromPod streams the file from the Kubernetes pod to a local file. | ||
func streamFileFromPod(clientset *kubernetes.Clientset, | ||
config *rest.Config, tw *tar.Writer, | ||
localDirectory, clusterName, namespace, podName, containerName, remotePath string, | ||
remoteFileSize int64) error { | ||
|
||
// create localPath to write the streamed data from remotePath | ||
// use the uniqueness of outputFile to avoid overwriting other files | ||
localPath := filepath.Join(localDirectory, remotePath) | ||
if err := os.MkdirAll(filepath.Dir(localPath), 0750); err != nil { | ||
return fmt.Errorf("failed to create path for file: %w", err) | ||
} | ||
outFile, err := os.Create(filepath.Clean(localPath)) | ||
if err != nil { | ||
return fmt.Errorf("failed to create local file: %w", err) | ||
} | ||
|
||
defer func() { | ||
// ignore any errors from Close functions, the writers will be | ||
// closed when the program exits | ||
if outFile != nil { | ||
_ = outFile.Close() | ||
} | ||
}() | ||
|
||
// request to cat remotePath | ||
req := clientset.CoreV1().RESTClient(). | ||
Get(). | ||
Resource("pods"). | ||
Name(podName). | ||
Namespace(namespace). | ||
SubResource("exec"). | ||
Param("container", containerName). | ||
Param("command", "cat"). | ||
Param("command", remotePath). | ||
Param("stderr", "true"). | ||
Param("stdout", "true") | ||
|
||
exec, err := remotecommand.NewSPDYExecutor(config, "GET", req.URL()) | ||
if err != nil { | ||
return fmt.Errorf("failed to initialize SPDY executor: %w", err) | ||
} | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Could we use the There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. The problem I've had there is the current flow assumes the contents will be held in memory. I couldn't figure out how to use the current flow to pass to There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. hmmm,
That's a slight variation on the There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. one slight change
|
||
|
||
// stream remotePath to localPath | ||
err = exec.Stream(remotecommand.StreamOptions{ | ||
Stdout: outFile, | ||
Stderr: os.Stderr, | ||
Tty: false, | ||
}) | ||
if err != nil { | ||
return fmt.Errorf("error during file streaming: %w", err) | ||
} | ||
|
||
// compare file sizes | ||
localFileInfo, err := os.Stat(localPath) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Here we use |
||
if err != nil { | ||
return fmt.Errorf("failed to get file info: %w", err) | ||
} | ||
if remoteFileSize != localFileInfo.Size() { | ||
benjaminjb marked this conversation as resolved.
Show resolved
Hide resolved
|
||
return fmt.Errorf("filesize mismatch: remote size is %v and local size is %v", | ||
remoteFileSize, localFileInfo.Size()) | ||
} | ||
|
||
// add localPath to the support export tar | ||
tarPath := fmt.Sprintf("%s/pods/%s/%s", clusterName, podName, remotePath) | ||
err = addFileToTar(tw, localPath, tarPath) | ||
if err != nil { | ||
return fmt.Errorf("error writing to tar: %w", err) | ||
} | ||
|
||
return nil | ||
} | ||
|
||
// addFileToTar copies a local file into a tar archive | ||
func addFileToTar(tw *tar.Writer, localPath, tarPath string) error { | ||
// Open the file to be added to the tar | ||
file, err := os.Open(filepath.Clean(localPath)) | ||
if err != nil { | ||
return fmt.Errorf("failed to open file: %w", err) | ||
} | ||
defer func() { | ||
// ignore any errors from Close functions, the writers will be | ||
// closed when the program exits | ||
if file != nil { | ||
_ = file.Close() | ||
} | ||
}() | ||
|
||
// Get file info to create tar header | ||
fileInfo, err := file.Stat() | ||
if err != nil { | ||
return fmt.Errorf("failed to get file info: %w", err) | ||
} | ||
|
||
// Create tar header | ||
header := &tar.Header{ | ||
Name: tarPath, // Name in the tar archive | ||
Size: fileInfo.Size(), // File size | ||
Mode: int64(fileInfo.Mode()), // File mode | ||
ModTime: fileInfo.ModTime(), // Modification time | ||
} | ||
|
||
// Write header to the tar | ||
err = tw.WriteHeader(header) | ||
if err != nil { | ||
return fmt.Errorf("failed to write tar header: %w", err) | ||
} | ||
|
||
// Stream the file content to the tar | ||
_, err = io.Copy(tw, file) | ||
benjaminjb marked this conversation as resolved.
Show resolved
Hide resolved
|
||
if err != nil { | ||
return fmt.Errorf("failed to copy file data to tar: %w", err) | ||
} | ||
|
||
return nil | ||
} | ||
|
||
// getRemoteFileSize returns the size of a file within a container so that we can stream its contents | ||
func getRemoteFileSize(config *rest.Config, | ||
namespace string, podName string, containerName string, filePath string) (int64, error) { | ||
|
||
podExec, err := util.NewPodExecutor(config) | ||
if err != nil { | ||
return 0, fmt.Errorf("could not create executor: %w", err) | ||
} | ||
exec := func(stdin io.Reader, stdout, stderr io.Writer, command ...string, | ||
) error { | ||
return podExec(namespace, podName, containerName, | ||
stdin, stdout, stderr, command...) | ||
} | ||
|
||
// Prepare the command to get the file size using "stat -c %s <file>" | ||
command := fmt.Sprintf("stat -c %s %s", "%s", filePath) | ||
stdout, stderr, err := Executor(exec).bashCommand(command) | ||
if err != nil { | ||
return 0, fmt.Errorf("could not get file size: %w, stderr: %s", err, stderr) | ||
} | ||
|
||
// Parse the file size from stdout | ||
size, err := strconv.ParseInt(strings.TrimSpace(stdout), 10, 64) | ||
if err != nil { | ||
return 0, fmt.Errorf("failed to parse file size: %w", err) | ||
} | ||
|
||
return size, nil | ||
} | ||
|
||
// ConvertBytes converts a byte size (int64) into a human-readable format. | ||
func ConvertBytes(bytes int64) string { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. We're not exporting this command for another package to use, so There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I renamed this one |
||
const unit = 1024 | ||
if bytes < unit { | ||
return fmt.Sprintf("%d B", bytes) | ||
} | ||
div, exp := int64(unit), 0 | ||
for n := bytes / unit; n >= unit; n /= unit { | ||
div *= unit | ||
exp++ | ||
} | ||
return fmt.Sprintf("%.1f %cB", float64(bytes)/float64(div), "KMGTPE"[exp]) | ||
} |
Uh oh!
There was an error while loading. Please reload this page.