Skip to content

Commit c08bd28

Browse files
authored
Merge pull request apache#32 from wangtuanjie/wtj
x
2 parents dd3da91 + 14abb6c commit c08bd28

File tree

11 files changed

+457
-6
lines changed

11 files changed

+457
-6
lines changed

scala/base/src/main/scala/com/pili/Json.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,7 @@ object Json {
2121

2222
def marshalAsString(obj: AnyRef): String = mapper.writeValueAsString(obj)
2323

24-
def unmarsha[T: ClassTag](value: Array[Byte]): T = mapper.readValue(value, getClassTag[T].runtimeClass.asInstanceOf[Class[T]])
24+
def unmarshal[T: ClassTag](value: Array[Byte]): T = mapper.readValue(value, getClassTag[T].runtimeClass.asInstanceOf[Class[T]])
2525

2626
def unmarshal[T: ClassTag](in: InputStream): T = mapper.readValue(in, getClassTag[T].runtimeClass.asInstanceOf[Class[T]])
2727

scala/base/src/main/scala/com/pili/Mongo.scala

Lines changed: 11 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,8 +1,11 @@
11
package com.pili
22

3+
import java.util.concurrent.TimeUnit
4+
35
import scala.collection.JavaConversions._
46

57
import com.mongodb.client.MongoCollection
8+
import com.mongodb.client.model.IndexOptions
69
import com.mongodb.{MongoClient, ServerAddress}
710
import org.bson.{BsonInt32, Document}
811

@@ -11,9 +14,14 @@ object Mongo {
1114
def ensureUniqueIndex(coll: MongoCollection[Document], keys: Seq[String]) = {
1215
val index = new Document()
1316
keys.foreach(index.append(_, new BsonInt32(1)))
14-
val opt = new Document()
15-
opt.append("unique", true)
16-
coll.createIndex(index)
17+
val opt = new IndexOptions().unique(true)
18+
coll.createIndex(index, opt)
19+
}
20+
21+
def ensureExpireIndex(coll: MongoCollection[Document], field: String) = {
22+
val index = new Document().append(field, new BsonInt32(1))
23+
val opt = new IndexOptions().expireAfter(1L, TimeUnit.SECONDS)
24+
coll.createIndex(index, opt)
1725
}
1826

1927
def newMongoClient(a: String): MongoClient = {
Lines changed: 52 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,52 @@
1+
package com.pili.biz
2+
3+
4+
object QiniuAudit {
5+
def apply(log: String) = {
6+
try {
7+
Some(new QiniuAudit(log))
8+
} catch {
9+
case e: Exception => None
10+
}
11+
}
12+
}
13+
14+
@SerialVersionUID(311725166982L)
15+
class QiniuAudit(val log: String) extends java.io.Serializable {
16+
17+
val fields = log.split("\t")
18+
require(fields.length == 12)
19+
20+
def service: String = fields(1)
21+
22+
val startTime: Long = fields(2).toLong
23+
val endTime: Long = startTime + fields(11).toLong
24+
25+
def method: String = fields(3)
26+
27+
def path: String = fields(4)
28+
29+
def reqBody: String = fields(6)
30+
31+
val statusCode: Int = fields(7).toInt
32+
33+
def respBody: String = fields(9)
34+
35+
val respBodyLen: Long = fields(10).toLong
36+
37+
override def toString: String = log
38+
39+
override def hashCode: Int = log.hashCode
40+
41+
override def equals(a: Any): Boolean = a match {
42+
case l: QiniuAudit => log.equals(l.log)
43+
case _ => false
44+
}
45+
46+
def elapsed: Long = endTime - startTime
47+
48+
def contains(key: String): Boolean = fields.exists(_.contains(key))
49+
50+
def lineContains(key: String): Boolean = log.contains(key)
51+
52+
}

scala/biz/src/main/scala/com/pili/biz/QiniuLog.scala

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,5 @@
11
package com.pili.biz
22

3-
43
// logsync收集回来的日志,会在每行尾添加 \t$sname\t$host
54
class QiniuLog(val line: String) {
65

scala/project/build.scala

Lines changed: 9 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,7 @@ object Build extends Build {
1212
organization := "pili.com",
1313
libraryDependencies ++= Seq(
1414
"org.scalatest" %% "scalatest" % "2.2.1" % "test",
15+
"org.slf4s" %% "slf4s-api" % "1.7.10",
1516
"joda-time" % "joda-time" % "2.8",
1617
"org.mongodb" % "mongo-java-driver" % "3.2.1",
1718
"com.fasterxml.jackson.core" % "jackson-databind" % "2.4.4",
@@ -26,7 +27,7 @@ object Build extends Build {
2627
"org.apache.spark" %% "spark-hive" % "1.6.2" % "provided"
2728
)
2829

29-
lazy val root = project.in(file(".")).aggregate(base, biz, loadStreamgateAudit)
30+
lazy val root = project.in(file(".")).aggregate(base, biz, loadStreamgateAudit, statException)
3031

3132
lazy val base = Project("base", file("base"))
3233
.settings(defaultSettings: _*)
@@ -41,5 +42,12 @@ object Build extends Build {
4142
.settings(defaultSettings: _*)
4243
.settings(libraryDependencies ++= sparkDependencies)
4344
.settings(libraryDependencies += "org.mongodb.mongo-hadoop" % "mongo-hadoop-core" % "1.5.2" % "provided")
45+
46+
lazy val statException = Project("statException", file("statException"))
47+
.dependsOn(base)
48+
.dependsOn(biz)
49+
.settings(defaultSettings: _*)
50+
.settings(libraryDependencies += "org.apache.kafka" %% "kafka" % "0.8.2.1")
51+
.settings(libraryDependencies += "org.apache.flume" % "flume-ng-sdk" % "1.5.2")
4452
}
4553

Lines changed: 148 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,148 @@
1+
package com.pili.statexception
2+
3+
import java.util.Date
4+
5+
import com.mongodb.MongoClient
6+
import com.mongodb.client.MongoCollection
7+
import com.mongodb.client.model.UpdateOptions
8+
import org.bson.Document
9+
import org.joda.time.format.DateTimeFormat
10+
11+
import com.pili.{Json, Mongo, Props}
12+
13+
import com.pili.biz.QiniuLog
14+
15+
16+
object Forward {
17+
18+
case class Entry(date: Date, hub: String, stream: String, reason: String, remote: String, local: String)
19+
20+
21+
case class ConnectInfo(
22+
Role: String,
23+
URL: String,
24+
RemoteAddr: String,
25+
LocalAddr: String
26+
)
27+
28+
case class OnStopInfo(streamID: String, infos: Seq[ConnectInfo], detail: String)
29+
30+
val dateFmt = DateTimeFormat.forPattern("yyyy/MM/dd HH:mm:ss.SSSSSS")
31+
32+
// scalastyle:off
33+
// 2016/08/03 00:36:24.694523 [ZUwqAAEbL5_lCmcU][INFO] qiniu.com/pili.v1/streamgate/callback.go:319: onStop {"streamID":"z1.1024.1024","infos":[{"IsSource":false,"Role":"dialer","URL":"rtmp://wtj.com/1024/1024?role=customForward","Method":"publish","ConnID":"2rT6m841Si6IpVAL","RemoteAddr":"172.21.1.102:1935","LocalAddr":"10.200.20.28:55600","Source":"","Status":"disconnected","Length":64809,"Frames":{"Video":14,"Audio":26,"Data":0},"Delay":0,"Duration":0,"ExtraData":{"SourceID":"4gQIPJTcScqDKrEL","Dialer":{"url":"rtmp://wtj.com/1024/1024?role=customForward","method":"publish","timeoutSecond":30},"PingInterval":0,"CallbackData":{"hub":"1024","reqId":"Y3M4LgAAAAAb0BxM","streamId":"z1.1024.1024","uid":"1380518997","url":"rtmp://wtj.com/1024/1024?role=customForward"}},"NoGopCache":false}],"detail":"EOF"}
34+
// scalastyle:on
35+
def parse(line: String): Option[Entry] = {
36+
if (line.indexOf(": onStop") == -1) {
37+
return None
38+
}
39+
if (line.indexOf("base.ExtraData") != -1) {
40+
// 过滤未升级前 来的日志格式
41+
return None
42+
}
43+
val d = dateFmt.parseDateTime(line.substring(0, 26)).toDate
44+
val l = line.indexOf("{")
45+
val r = line.lastIndexOf("}")
46+
val jl = line.substring(l, r + 1)
47+
val info = Json.unmarshal[OnStopInfo](jl)
48+
49+
for (x <- info.infos) {
50+
if (x.URL.indexOf("role=customForward") > 0) {
51+
val fields = info.streamID.split("\\.")
52+
val hub = fields(1)
53+
val stream = fields(2)
54+
val remote = x.RemoteAddr.split(":")(0)
55+
val local = x.LocalAddr.split(":")(0)
56+
return Some(new Entry(d, hub, stream, info.detail, remote, local))
57+
}
58+
}
59+
return None
60+
}
61+
}
62+
63+
class Forward(val conf: Map[String, String]) extends Runnable {
64+
65+
import Forward._
66+
67+
68+
val kafkaConf = Props.subProps(conf, "kafka")
69+
val expireDay = conf("expire.day").toInt
70+
val expireMs = 24 * 3600 * 1000 * expireDay
71+
72+
val mgoAddr = conf("mongo.addr")
73+
val mgoDB = conf("mongo.db")
74+
val mgoColl = conf("mongo.coll.forward")
75+
76+
77+
def run(): Unit = {
78+
val mcli: MongoClient = Mongo.newMongoClient(mgoAddr)
79+
val coll = mcli.getDatabase(mgoDB).getCollection(mgoColl)
80+
Mongo.ensureUniqueIndex(coll, Seq("time", "hub", "key"))
81+
Mongo.ensureExpireIndex(coll, "expireAt")
82+
83+
84+
var n = 0L
85+
val msgStream = Util.consumerTopic(kafkaConf("topic.forward"), kafkaConf)
86+
87+
msgStream.foreach(msgs => {
88+
msgs.foreach(msg => {
89+
try {
90+
n += 1
91+
if (n % 10000 == 0) {
92+
println(s"forward handled $n")
93+
}
94+
95+
val line = Util.flumeEventDecode(msg.message())
96+
if (line.isDefined) {
97+
val l = new QiniuLog(line.get)
98+
val entry = parse(l.log)
99+
if (entry.isDefined) {
100+
val reason = reasonOf(entry.get.reason)
101+
if (reason.isDefined) {
102+
insert2Mgo(coll, entry.get)
103+
}
104+
}
105+
}
106+
} catch {
107+
case e: Exception => {
108+
e.printStackTrace()
109+
}
110+
}
111+
})
112+
})
113+
}
114+
115+
116+
def insert2Mgo(coll: MongoCollection[Document], e: Entry) = {
117+
val query = new Document("time", e.date)
118+
.append("hub", e.hub)
119+
.append("key", e.stream)
120+
121+
val opt = new UpdateOptions().upsert(true)
122+
123+
val update = new Document("$set", new Document()
124+
.append("remote", e.remote)
125+
.append("local", e.local)
126+
.append("reason", e.reason)
127+
.append("expireAt", new Date(e.date.getTime + expireMs)))
128+
129+
coll.updateOne(query, update, opt)
130+
}
131+
132+
133+
def reasonOf(reason: String): Option[String] = {
134+
reason match {
135+
case "Connection Timeout" => Some("connection timeout") // 数据超时
136+
case "EOF" => None
137+
case "unexpected EOF" => None
138+
case "source closed" => None
139+
case _ => {
140+
(reason.indexOf("connection reset by peer") > 0) match {
141+
case true => Some("connection reset by peer")
142+
case _ => Some("unknown")
143+
}
144+
}
145+
}
146+
}
147+
148+
}
Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,20 @@
1+
package com.pili.statexception
2+
3+
import com.pili.Props
4+
5+
object Main extends App {
6+
7+
if (args.length != 1) {
8+
System.err.println("usage: ./bin <config file>")
9+
System.exit(1)
10+
}
11+
12+
val conf = Props.load(args(0))
13+
val publisher = new Thread(new Publish(conf))
14+
val forwarder = new Thread(new Forward(conf))
15+
publisher.start()
16+
forwarder.start()
17+
18+
publisher.join()
19+
forwarder.join()
20+
}

0 commit comments

Comments
 (0)