diff --git a/flume-ng-core/src/main/java/org/apache/flume/serialization/RegexLineDeserializer.java b/flume-ng-core/src/main/java/org/apache/flume/serialization/RegexLineDeserializer.java new file mode 100644 index 0000000000..942e56811c --- /dev/null +++ b/flume-ng-core/src/main/java/org/apache/flume/serialization/RegexLineDeserializer.java @@ -0,0 +1,245 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flume.serialization; + +import java.io.IOException; +import java.nio.charset.Charset; +import java.util.List; +import java.util.regex.Matcher; +import java.util.regex.Pattern; + +import org.apache.flume.Context; +import org.apache.flume.Event; +import org.apache.flume.annotations.InterfaceAudience; +import org.apache.flume.annotations.InterfaceStability; +import org.apache.flume.event.EventBuilder; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import com.google.common.collect.Lists; + +/** + * A deserializer that parses text lines from a file. + * matching line/multiLine against the regular expression. + */ +@InterfaceAudience.Private +@InterfaceStability.Evolving +public class RegexLineDeserializer implements EventDeserializer { + + private static final Logger logger = LoggerFactory.getLogger(RegexLineDeserializer.class); + + private final ResettableInputStream in; + private final Charset outputCharset; + private volatile boolean isOpen; + + public static final String OUT_CHARSET_KEY = "outputCharset"; + public static final String CHARSET_DFLT = "UTF-8"; + + // matching line/multiLine against the regular expression + public static final String FILE_PATTERN_KEY = "filePattern"; + private Pattern pattern; + + RegexLineDeserializer(Context context, ResettableInputStream in) { + this.in = in; + this.outputCharset = Charset.forName( + context.getString(OUT_CHARSET_KEY, CHARSET_DFLT)); + this.isOpen = true; + + // regular expressions + String filePattern = context.getString(FILE_PATTERN_KEY, ""); + if (null != filePattern && !"".equals(filePattern)) { + this.pattern = Pattern.compile(filePattern, Pattern.DOTALL); + } + } + + /** + * Reads a line from a file and returns an event + * @return Event containing parsed line + * @throws IOException + */ + @Override + public Event readEvent() throws IOException { + ensureOpen(); + + if (null != pattern) { + String multiLine = readLineRegex(); + if (null == multiLine || "".equals(multiLine)) { + return null; + } else { + return EventBuilder.withBody(multiLine, outputCharset); + } + } + + String line = readLine(); + if (line == null) { + return null; + } else { + return EventBuilder.withBody(line, outputCharset); + } + } + + /** + * Batch line read + * @param numEvents Maximum number of events to return. + * @return List of events containing read lines + * @throws IOException + */ + @Override + public List readEvents(int numEvents) throws IOException { + ensureOpen(); + List events = Lists.newLinkedList(); + for (int i = 0; i < numEvents; i++) { + Event event = readEvent(); + if (event != null) { + events.add(event); + } else { + break; + } + } + return events; + } + + @Override + public void mark() throws IOException { + ensureOpen(); + in.mark(); + } + + @Override + public void reset() throws IOException { + ensureOpen(); + in.reset(); + } + + @Override + public void close() throws IOException { + if (isOpen) { + reset(); + in.close(); + isOpen = false; + } + } + + private void ensureOpen() { + if (!isOpen) { + throw new IllegalStateException("Serializer has been closed"); + } + } + + // TODO: consider not returning a final character that is a high surrogate + // when truncating + private String readLine() throws IOException { + StringBuilder sb = new StringBuilder(); + int c; + int readChars = 0; + while ((c = in.readChar()) != -1) { + readChars++; + + // FIXME: support \r\n + if (c == '\n') { + break; + } + + sb.append((char)c); + } + + if (readChars > 0) { + return sb.toString(); + } else { + return null; + } + } + + private String readLineRegex() throws IOException { + StringBuilder sb = new StringBuilder(); + StringBuilder multiSb = new StringBuilder(); + Matcher matcher; + int c; + boolean hasRegex = false; + long nextLineStart = in.tell(); + boolean isLastLine = true; + while ((c = in.readChar()) != -1) { + // FIXME: support \r\n + if (c == 65279) { + continue; + } + if (c == '\n') { + matcher = pattern.matcher(sb.toString()); + if (matcher.matches()) { + if (hasRegex) { + in.seek(nextLineStart); + sb = new StringBuilder(); + isLastLine = false; + break; + } + hasRegex = true; + multiSb = new StringBuilder(); + } + if (sb.toString() != null && !"".equals(sb.toString())) { + multiSb.append(sb).append((char) c); + } + sb = new StringBuilder(); + nextLineStart = in.tell(); + } else { + sb.append((char) c); + } + } + + // last line + if (isLastLine && hasRegex) { + if (sb.toString() != null || !"".equals(sb.toString())) { + matcher = pattern.matcher(sb.toString()); + // matched or not + if (matcher.matches()) { + // first line + in.seek(nextLineStart); + sb = new StringBuilder(); + } else { + multiSb.append(sb); + } + } + } + String multiLine = multiSb.toString(); + if (multiLine != null && !"".equals(multiLine)) { + if (multiSb.lastIndexOf("\n") == multiSb.length() - 1 && multiSb.length() > 1) { + multiLine = multiLine.substring(0, multiSb.length() - 1); + } + return multiLine; + } else { + // last line + if (sb.toString() != null || !"".equals(sb.toString())) { + matcher = pattern.matcher(sb.toString()); + if (matcher.matches()) { + return sb.toString(); + } + } + return null; + } + } + + public static class Builder implements EventDeserializer.Builder { + + @Override + public EventDeserializer build(Context context, ResettableInputStream in) { + return new RegexLineDeserializer(context, in); + } + + } + +} diff --git a/flume-ng-core/src/test/java/org/apache/flume/serialization/TestRegexLineDeserializer.java b/flume-ng-core/src/test/java/org/apache/flume/serialization/TestRegexLineDeserializer.java new file mode 100644 index 0000000000..ca7d02d48c --- /dev/null +++ b/flume-ng-core/src/test/java/org/apache/flume/serialization/TestRegexLineDeserializer.java @@ -0,0 +1,73 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flume.serialization; + +import java.io.IOException; + +import org.apache.flume.Context; +import org.apache.flume.Event; +import org.junit.Before; +import org.junit.Test; + +public class TestRegexLineDeserializer { + + private String fileInfo; + + @Before + public void setup() { + StringBuilder sb = new StringBuilder(); + sb.append("2016-10-19 10:19:10,188 INFO - Configuration provider starting"); + sb.append("\n"); + sb.append("2016-10-19 10:19:10,199 DEBUG - Starting validation of for agent: agent"); + sb.append("\n"); + sb.append("SOURCES: {s1={ parameters:{decodeErrorPolicy=REPLACE} }}"); + sb.append("\n"); + sb.append("CHANNELS: {c1={ parameters:{checkpointDir=./agent/checkpoint} }}"); + sb.append("\n"); + sb.append("SINKS: {k1={ parameters:{clusterName=es, indexType=test4} }}"); + sb.append("\n"); + sb.append("\n"); + sb.append("2016-10-19 10:19:10,204 DEBUG - Created channel c1"); + sb.append("\n"); + fileInfo = sb.toString(); + } + + @Test + public void testMultiLine() throws IOException { + // ^\d{4}-\d{2}-\d{2} \d{2}:\d{2}:\d{2},\d{3} + // ^((${srcStartWithString})(\\s|\\S)).* + Context context = new Context(); + context.put("filePattern", "^((\\d{4}-\\d{2}-\\d{2} \\d{2}:\\d{2}:\\d{2},\\d{3})(\\s|\\S)).*"); + ResettableInputStream in = new TestRegexLineInputStream(fileInfo); + EventDeserializer des = new RegexLineDeserializer(context, in); + Event evt = des.readEvent(); + String line1 = new String(evt.getBody(), "UTF-8"); + System.out.println("line 1\n" + line1); + des.mark(); + evt = des.readEvent(); + String line2 = new String(evt.getBody(), "UTF-8"); + System.out.println("line 2\n" + line2); + des.mark(); + evt = des.readEvent(); + String line3 = new String(evt.getBody(), "UTF-8"); + System.out.println("line 3\n" + line3); + des.reset(); + des.mark(); + des.close(); + } +} diff --git a/flume-ng-core/src/test/java/org/apache/flume/serialization/TestRegexLineInputStream.java b/flume-ng-core/src/test/java/org/apache/flume/serialization/TestRegexLineInputStream.java new file mode 100644 index 0000000000..5e12d68579 --- /dev/null +++ b/flume-ng-core/src/test/java/org/apache/flume/serialization/TestRegexLineInputStream.java @@ -0,0 +1,80 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flume.serialization; + +import java.io.IOException; + +public class TestRegexLineInputStream extends ResettableInputStream { + + private String str; + int markPos = 0; + int curPos = 0; + + /** + * Warning: This test class does not handle character/byte conversion at all! + * @param str String to use for testing + */ + public TestRegexLineInputStream(String str) { + this.str = str; + } + + @Override + public int readChar() throws IOException { + if (curPos >= str.length()) { + return -1; + } + return str.charAt(curPos++); + } + + @Override + public void mark() throws IOException { + markPos = curPos; + } + + @Override + public void reset() throws IOException { + curPos = markPos; + } + + @Override + public void seek(long newPos) throws IOException { + curPos = Integer.valueOf(newPos + ""); + } + + @Override + public long tell() throws IOException { + return curPos; + } + + @Override + public int read() throws IOException { + throw new UnsupportedOperationException("This test class doesn't return " + + "bytes!"); + } + + @Override + public int read(byte[] b, int off, int len) throws IOException { + throw new UnsupportedOperationException("This test class doesn't return " + + "bytes!"); + } + + @Override + public void close() throws IOException { + // no-op + } +}