Skip to content

Commit edbf76e

Browse files
philrhurstPhilip Hurst
andauthored
Large postgres logs (#105)
Support for large Postgres files Large Postgres files may pose a challenge for some systems. This change stores the file on disk, compares the filesize with the remote, and adds the local file to the tar. It has error handling to allow a support export to be generated when there are issues with gathering PG logs. * fixes for lint errors * Remove the cat logic for gathering PG logs. Only do streaming. * Create a unique sudirectory in outputDir (matching the unique timestamp) so that local files are logically separate from other files. Adding messaging and error handling so a user understands. * remove extra call to getRemoteFileSize * refactored getRemoteFileSize to use bashCommand to get the filesize * added copyFile function to cat a file contents to a local file * use the copyFile function rather calling manually * added example comment for fileSpecSrc * added comment to copyFile * rename ConvertBytes to convertBytes --------- Co-authored-by: Philip Hurst <[email protected]>
1 parent 3a83853 commit edbf76e

File tree

2 files changed

+209
-19
lines changed

2 files changed

+209
-19
lines changed

internal/cmd/exec.go

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@ import (
1818
"bytes"
1919
"fmt"
2020
"io"
21+
"os"
2122
)
2223

2324
// Executor calls commands
@@ -110,6 +111,15 @@ func (exec Executor) catFile(filePath string) (string, string, error) {
110111
return stdout.String(), stderr.String(), err
111112
}
112113

114+
// copyFile takes the full path of a file and a local destination to save the
115+
// file on disk
116+
func (exec Executor) copyFile(source string, destination *os.File) (string, error) {
117+
var stderr bytes.Buffer
118+
command := fmt.Sprintf("cat %s", source)
119+
err := exec(nil, destination, &stderr, "bash", "-ceu", "--", command)
120+
return stderr.String(), err
121+
}
122+
113123
// patronictl takes a patronictl subcommand and returns the output of that command
114124
func (exec Executor) patronictl(cmd, output string) (string, string, error) {
115125
var stdout, stderr bytes.Buffer

internal/cmd/export.go

Lines changed: 199 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,8 @@ import (
2323
"io"
2424
"os"
2525
"os/exec"
26+
"path/filepath"
27+
"strconv"
2628
"strings"
2729
"text/tabwriter"
2830
"time"
@@ -449,7 +451,8 @@ Collecting PGO CLI logs...
449451
// All Postgres Logs on the Postgres Instances (primary and replicas)
450452
if numLogs > 0 {
451453
if err == nil {
452-
err = gatherPostgresLogsAndConfigs(ctx, clientset, restConfig, namespace, clusterName, numLogs, tw, cmd)
454+
err = gatherPostgresLogsAndConfigs(ctx, clientset, restConfig,
455+
namespace, clusterName, outputDir, outputFile, numLogs, tw, cmd)
453456
}
454457
}
455458

@@ -955,6 +958,8 @@ func gatherPostgresLogsAndConfigs(ctx context.Context,
955958
config *rest.Config,
956959
namespace string,
957960
clusterName string,
961+
outputDir string,
962+
outputFile string,
958963
numLogs int,
959964
tw *tar.Writer,
960965
cmd *cobra.Command,
@@ -1024,30 +1029,57 @@ func gatherPostgresLogsAndConfigs(ctx context.Context,
10241029
}
10251030

10261031
logFiles := strings.Split(strings.TrimSpace(stdout), "\n")
1027-
for _, logFile := range logFiles {
1028-
writeDebug(cmd, fmt.Sprintf("LOG FILE: %s\n", logFile))
1029-
var buf bytes.Buffer
10301032

1031-
stdout, stderr, err := Executor(exec).catFile(logFile)
1033+
// localDirectory is created to save data on disk
1034+
// e.g. outputDir/crunchy_k8s_support_export_2022-08-08-115726-0400/remotePath
1035+
localDirectory := filepath.Join(outputDir, strings.ReplaceAll(outputFile, ".tar.gz", ""))
1036+
1037+
// flag to determine whether or not to remove localDirectory after the loop
1038+
// When an error happens, this flag will switch to false
1039+
// It's nice to have the extra data around when errors have happened
1040+
doCleanup := true
1041+
1042+
for _, logFile := range logFiles {
1043+
// get the file size to stream
1044+
fileSize, err := getRemoteFileSize(config, namespace, pod.Name, util.ContainerDatabase, logFile)
10321045
if err != nil {
1033-
if apierrors.IsForbidden(err) {
1034-
writeInfo(cmd, err.Error())
1035-
// Continue and output errors for each log file
1036-
// Allow the user to see and address all issues at once
1037-
continue
1038-
}
1039-
return err
1046+
writeDebug(cmd, fmt.Sprintf("could not get file size for %s: %v\n", logFile, err))
1047+
continue
10401048
}
10411049

1042-
buf.Write([]byte(stdout))
1043-
if stderr != "" {
1044-
str := fmt.Sprintf("\nError returned: %s\n", stderr)
1045-
buf.Write([]byte(str))
1050+
// fileSpecSrc is namespace/podname:path/to/file
1051+
// fileSpecDest is the local destination of the file
1052+
// These are used to help the user grab the file manually when necessary
1053+
// e.g. postgres-operator/hippo-instance1-vp9k-0:pgdata/pg16/log/postgresql-Tue.log
1054+
fileSpecSrc := fmt.Sprintf("%s/%s:%s", namespace, pod.Name, logFile)
1055+
fileSpecDest := filepath.Join(localDirectory, logFile)
1056+
writeInfo(cmd, fmt.Sprintf("\tSize of %-85s %v", fileSpecSrc, convertBytes(fileSize)))
1057+
1058+
// Stream the file to disk and write the local file to the tar
1059+
err = streamFileFromPod(config, tw,
1060+
localDirectory, clusterName, namespace, pod.Name, util.ContainerDatabase, logFile, fileSize)
1061+
1062+
if err != nil {
1063+
doCleanup = false // prevent the deletion of localDirectory so a user can examine contents
1064+
writeInfo(cmd, fmt.Sprintf("\tError streaming file %s: %v", logFile, err))
1065+
writeInfo(cmd, fmt.Sprintf("\tCollect manually with kubectl cp -c %s %s %s",
1066+
util.ContainerDatabase, fileSpecSrc, fileSpecDest))
1067+
writeInfo(cmd, fmt.Sprintf("\tRemove %s manually after gathering necessary information", localDirectory))
1068+
continue
10461069
}
10471070

1048-
path := clusterName + fmt.Sprintf("/pods/%s/", pod.Name) + logFile
1049-
if err := writeTar(tw, buf.Bytes(), path, cmd); err != nil {
1050-
return err
1071+
}
1072+
1073+
// doCleanup is true when there are no errors above.
1074+
if doCleanup {
1075+
// Remove the local directory created to hold the data
1076+
// Errors in removing localDirectory should instruct the user to remove manually.
1077+
// This happens often on Windows
1078+
err = os.RemoveAll(localDirectory)
1079+
if err != nil {
1080+
writeInfo(cmd, fmt.Sprintf("\tError removing %s: %v", localDirectory, err))
1081+
writeInfo(cmd, fmt.Sprintf("\tYou may need to remove %s manually", localDirectory))
1082+
continue
10511083
}
10521084
}
10531085

@@ -1800,3 +1832,151 @@ func writeDebug(cmd *cobra.Command, s string) {
18001832
t := time.Now()
18011833
cmd.Printf("%s - DEBUG - %s", t.Format(logTimeFormat), s)
18021834
}
1835+
1836+
// streamFileFromPod streams the file from the Kubernetes pod to a local file.
1837+
func streamFileFromPod(config *rest.Config, tw *tar.Writer,
1838+
localDirectory, clusterName, namespace, podName, containerName, remotePath string,
1839+
remoteFileSize int64) error {
1840+
1841+
// create localPath to write the streamed data from remotePath
1842+
// use the uniqueness of outputFile to avoid overwriting other files
1843+
localPath := filepath.Join(localDirectory, remotePath)
1844+
if err := os.MkdirAll(filepath.Dir(localPath), 0750); err != nil {
1845+
return fmt.Errorf("failed to create path for file: %w", err)
1846+
}
1847+
outFile, err := os.Create(filepath.Clean(localPath))
1848+
if err != nil {
1849+
return fmt.Errorf("failed to create local file: %w", err)
1850+
}
1851+
1852+
defer func() {
1853+
// ignore any errors from Close functions, the writers will be
1854+
// closed when the program exits
1855+
if outFile != nil {
1856+
_ = outFile.Close()
1857+
}
1858+
}()
1859+
1860+
// Get Postgres Log Files
1861+
podExec, err := util.NewPodExecutor(config)
1862+
if err != nil {
1863+
return err
1864+
}
1865+
exec := func(stdin io.Reader, stdout, stderr io.Writer, command ...string,
1866+
) error {
1867+
return podExec(namespace, podName, containerName,
1868+
stdin, stdout, stderr, command...)
1869+
}
1870+
1871+
_, err = Executor(exec).copyFile(remotePath, outFile)
1872+
if err != nil {
1873+
return fmt.Errorf("error during file streaming: %w", err)
1874+
}
1875+
1876+
// compare file sizes
1877+
localFileInfo, err := os.Stat(localPath)
1878+
if err != nil {
1879+
return fmt.Errorf("failed to get file info: %w", err)
1880+
}
1881+
if remoteFileSize != localFileInfo.Size() {
1882+
return fmt.Errorf("filesize mismatch: remote size is %v and local size is %v",
1883+
remoteFileSize, localFileInfo.Size())
1884+
}
1885+
1886+
// add localPath to the support export tar
1887+
tarPath := fmt.Sprintf("%s/pods/%s/%s", clusterName, podName, remotePath)
1888+
err = addFileToTar(tw, localPath, tarPath)
1889+
if err != nil {
1890+
return fmt.Errorf("error writing to tar: %w", err)
1891+
}
1892+
1893+
return nil
1894+
}
1895+
1896+
// addFileToTar copies a local file into a tar archive
1897+
func addFileToTar(tw *tar.Writer, localPath, tarPath string) error {
1898+
// Open the file to be added to the tar
1899+
file, err := os.Open(filepath.Clean(localPath))
1900+
if err != nil {
1901+
return fmt.Errorf("failed to open file: %w", err)
1902+
}
1903+
defer func() {
1904+
// ignore any errors from Close functions, the writers will be
1905+
// closed when the program exits
1906+
if file != nil {
1907+
_ = file.Close()
1908+
}
1909+
}()
1910+
1911+
// Get file info to create tar header
1912+
fileInfo, err := file.Stat()
1913+
if err != nil {
1914+
return fmt.Errorf("failed to get file info: %w", err)
1915+
}
1916+
1917+
// Create tar header
1918+
header := &tar.Header{
1919+
Name: tarPath, // Name in the tar archive
1920+
Size: fileInfo.Size(), // File size
1921+
Mode: int64(fileInfo.Mode()), // File mode
1922+
ModTime: fileInfo.ModTime(), // Modification time
1923+
}
1924+
1925+
// Write header to the tar
1926+
err = tw.WriteHeader(header)
1927+
if err != nil {
1928+
return fmt.Errorf("failed to write tar header: %w", err)
1929+
}
1930+
1931+
// Stream the file content to the tar
1932+
_, err = io.Copy(tw, file)
1933+
if err != nil {
1934+
return fmt.Errorf("failed to copy file data to tar: %w", err)
1935+
}
1936+
1937+
return nil
1938+
}
1939+
1940+
// getRemoteFileSize returns the size of a file within a container so that we can stream its contents
1941+
func getRemoteFileSize(config *rest.Config,
1942+
namespace string, podName string, containerName string, filePath string) (int64, error) {
1943+
1944+
podExec, err := util.NewPodExecutor(config)
1945+
if err != nil {
1946+
return 0, fmt.Errorf("could not create executor: %w", err)
1947+
}
1948+
exec := func(stdin io.Reader, stdout, stderr io.Writer, command ...string,
1949+
) error {
1950+
return podExec(namespace, podName, containerName,
1951+
stdin, stdout, stderr, command...)
1952+
}
1953+
1954+
// Prepare the command to get the file size using "stat -c %s <file>"
1955+
command := fmt.Sprintf("stat -c %s %s", "%s", filePath)
1956+
stdout, stderr, err := Executor(exec).bashCommand(command)
1957+
if err != nil {
1958+
return 0, fmt.Errorf("could not get file size: %w, stderr: %s", err, stderr)
1959+
}
1960+
1961+
// Parse the file size from stdout
1962+
size, err := strconv.ParseInt(strings.TrimSpace(stdout), 10, 64)
1963+
if err != nil {
1964+
return 0, fmt.Errorf("failed to parse file size: %w", err)
1965+
}
1966+
1967+
return size, nil
1968+
}
1969+
1970+
// convertBytes converts a byte size (int64) into a human-readable format.
1971+
func convertBytes(bytes int64) string {
1972+
const unit = 1024
1973+
if bytes < unit {
1974+
return fmt.Sprintf("%d B", bytes)
1975+
}
1976+
div, exp := int64(unit), 0
1977+
for n := bytes / unit; n >= unit; n /= unit {
1978+
div *= unit
1979+
exp++
1980+
}
1981+
return fmt.Sprintf("%.1f %cB", float64(bytes)/float64(div), "KMGTPE"[exp])
1982+
}

0 commit comments

Comments
 (0)