RE: 從零開始的 Data Pipeline(一) — Data Collector
**哈囉,大家今天過得好嗎?**在 Data Pipeline 這系列的上一篇文章中已經介紹過了整個 Data Pipeline 大概長什麼樣子?會有哪些需要用到的東西?如果還不熟悉的可以先去看一下
這篇將會專門介紹 Data Collector 這個角色。
什麼是 Data Collector
在 Data Pipeline 中勢必會有在不同服務間傳送資料的東西存在。但如果每個環節都要從頭寫的話,不僅是費時費力,更是會變得混亂難管理,相信任何人都不會想要接手這種東西,因此有了 Data Collector 的出現,最常見的有兩個:Fluentd 與 Logstash。
兩者間主要的不同點在於 Fluentd 需要依賴 Ruby 跟 Gem,也可以用 td-agent 來建立獨立的 Ruby 環境。而 Logstash 則是需要 Java 環境。
Data Collector 可以做什麼
依照我們的應用情境來說,我們會在幾個不同的環節中使用到
- Application server:追蹤服務產生的 Log 並送到 Google Pub/Sub(GCP 上類似 Apache Kafka 的服務)
- 從 Google Pub/Sub 抓資料下來做點處理,然後丟上 Google Cloud Storage 儲存
由以上兩點可以發現,在傳送、處理、接收資料時會用到 Data Collector,有了這個就可以大幅度簡化整體架構的複雜度。
此外 Fluentd 可以很容易的串連,可以透過內建的 forward 方法在不同的 Fluentd 之間傳遞,也可以透過上述的 Google Pub/Sub 做為資料中繼站。
安裝 Fluentd
雖然不免俗地寫了這個章節,但我覺得照著官方的 安裝指南 實在很簡單,所以這邊就不多贅述。
值得一提的地方在於,在非 container 的環境中,我建議使用 td-agent來啟動 Fluentd 服務。這是由 Treasure Data 提供的一個在 Fluentd 之外包了一層管理工具的版本,具有獨立的 Ruby 環境且較容易管理 Fluentd 的服務(包含自動重啟服務、log rotation 等),可以省去大量的麻煩。
你的第一個 Fluentd 設定檔
Fluentd 的設定檔中會有幾個環節
<source>
這邊設定了 input 端,常見的有 tail、forward 等<filter>
對資料的過濾、處理等步驟(非必要)<match>
這邊是定義資料的 output 端
貫串了整個設定檔的東西是 tag
,Fluentd 會透過 tag 來把每筆資料分配到他該去的地方。接著我們就來動手寫一個會去追蹤 nginx log 的設定檔。
<source>
@type tail
path /fluentd/nginx/access.log
pos_file /fluentd/log/nginx-access.log.pos
tag nginx.access
<parse>
@type nginx
keep_time_key true
</parse>
</source>
<match nginx.access>
@type stdout
</match>
Input
type 每當指定的檔案有新的一行資料進去時,Fluentd 就會抓取並執行後續動作。
path
指定追蹤 /fluentd/nginx/access.log
這檔案
pos_file
而目前處理到哪一行則會儲存在 /fluentd/log/nginx-access.log.pos
這檔案中,這樣 Fluentd 重新啟動就可以從上次的位置繼續執行。
tag
這設定會讓從這檔案而來的所有資料都打上nginx.access
這個標籤,後續的應用都會認這個標籤。
Output
<match nginx.access>
這代表只會針對有 nginx.access
這標籤的資料執行,而 @type stdout
則是只輸出結果而已
實際應用
設定檔寫完了,接下來範例我會使用 docker 來執行,來看看結果如何。知道如何使用的朋友可以跟著執行看看
# 建立存放 nginx log 的目錄
mkdir nginx
# 啟動 nginx 服務(log 存至 nginx/)
docker run -v $(PWD)/nginx:/var/log/nginx/ -d -p 8080:80 --name nginx-server nginx
# 啟動 Fluentd 服務(抓取 nginx/ 的資料)
docker run -v $(PWD)/nginx:/fluentd/nginx --rm --name fluentd lukehong/nginx-fluentd-example:basic
執行完以上兩個指令後,應該就會有一個背景執行的 nginx container,以及畫面中應該會出現一些 log 說明著 Fluentd 已經開始執行。接著用 browser 進入 http://localhost:8080/ 之後,應該就能看到幾行結果
2019-01-19 15:18:26.000000000 +0000 nginx.access: {"remote":"172.17.0.1","host":"-","user":"-","time":"19/Jan/2019:15:18:26 +0000","method":"GET","path":"/","code":"304","size":"0","referer":"-","agent":"Mozilla/5.0 (Macintosh; Intel Mac OS X 10_14_1) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/71.0.3578.98 Safari/537.36","http_x_forwarded_for":"-"}
2019-01-19 15:18:34.000000000 +0000 nginx.access: {"remote":"172.17.0.1","host":"-","user":"-","time":"19/Jan/2019:15:18:34 +0000","method":"GET","path":"/","code":"304","size":"0","referer":"-","agent":"Mozilla/5.0 (Macintosh; Intel Mac OS X 10_14_1) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/71.0.3578.98 Safari/537.36","http_x_forwarded_for":"-"}
看到這些就代表成功囉。
進階設定
基本的成功了,那麼接下來我們就要開始進階版的,以下是使用情境
- 有多台 application server
- 有一台獨立的 Fluentd aggregator
在 1 上會有各自的 Fluentd forwarder 將資料送到 2 整合並且送到 Google Cloud Storage 儲存。這就會需要用到 Fluentd 內建的 in_forward
與 out_forward
來傳送與接收資料。
這邊會只使用一組 application server 來模擬。
Fluentd forwarder
fluentd-forwarder
的作用一樣是去追蹤 nginx log,但是不再是直接輸出結果,而是透過 forward 的方式送至 fluentd-aggregator
,因此在每一台 application server 上都會有一個 fluentd-forwarder
。
# fluent.conf
<source>
@type tail
tag nginx.access
...
</source>
<source>
@type tail
tag nginx.error
...
</source>
<match nginx.*>
@type forward
...
<server>
name aggregator
host fluentd-aggregator
port 24224
weight 60
</server>
</match>
除了原有 nginx.access
這邊標籤的 log 之外,我另外再加上了 nginx.error
這個標籤的 log,讓大家了解標籤是如何運作的。
<match nginx.*>
這用法就是只要主標籤是 nginx
的會成立,但如果想要讓所有以 nginx
為開頭的標籤時,則要使用 <match nginx**>
。
而在 <server>
中的 host 可以是 IP 或是 hostname等,在我的範例中是使用 docker container link 指定的 hostname。
Fluentd aggregator
fluentd-aggregator
的作用就是收集彙整所有 log,並且 access log 丟上 Google Cloud Storage(GCS)、error log 直接顯示出來。
這篇不會介紹 GCS 的使用方法,想要跟著做的人需要事先準備好可用的 GCP service account keyfile。
<source>
@type forward
port 24224
bind 0.0.0.0
</source>
<match nginx.error>
@type stdout
</match>
<match nginx.access>
@type gcs
project [YOUR_PROJECT]
keyfile /fluentd/etc/[YOUR_KEYFILE_NAME].json
bucket [YOUR_BUCKET_NAME]
...
</match>
Fluentd 的 forward 功能本身就會帶著標籤,因此在這邊的 <source>
中並不需要設定 tag
。
我這邊特別設計成遇到 nginx.error
這標籤時,就是直接使用 stdout
顯示出來。而遇到 nginx.access
時才會透過 Fluentd 的 fluent-plugin-gcs 這個 plugin 將資料送到 GCS 上面儲存。
要自己實做的人需要將 LukeHong/fluentd-example clone 下來,修改完 advance/aggregator/
下的 Dockerfile
與 fluent.conf
這兩支檔案中的 [YOUR_PROJECT]``[YOUR_KEYFILE_NAME]``[YOUR_BUCKET_NAME]
這三個填上正確的內容,再 build fluentd-aggregator
的 image。
docker build -t fluentd-aggregator advance/aggregator/
實作應用
# 啟動 nginxdocker run -v $(PWD)/nginx:/var/log/nginx/ -d -p 8080:80 --name nginx-server nginx
# 啟動 aggregator
docker run --name fluentd-aggregator --rm fluentd-aggregator
# 啟動 fluentd-forwarder
docker run --link fluentd-aggregator -v $(PWD)/nginx:/fluentd/nginx -d --rm --name fluentd-forwarder lukehong/nginx-fluentd-example:adv-forwarder
執行完這三個指令後,分別會啟動 nginx-server``fluentd-forwarder``flutentd-aggregator
這三個 container,需要注意的地方是使用了 --link fluentd-aggregator
才能在 fluentd-forwarder
中使用 hostname 連到它。
建立完 container 後一樣先到 http://localhost:8080 產生幾筆 access log,再到不存在的頁面(如 http://localhost:8080/error)隨意產生幾筆 error log。
過一段時間後,就能發現 GCS 上確實有資料寫進去了。
$ gsutil ls gs://fluentd-example/logs/2019/01/22/
gs://fluentd-example/logs/2019/01/22/201901220754_0.gz
gs://fluentd-example/logs/2019/01/22/201901220756_0.gz
gs://fluentd-example/logs/2019/01/22/201901220802_0.gz
而且我們隨便亂打路徑產生的 error 也有顯示在 terminal 中。
# fluentd-aggregator stdout
2019-01-22 08:06:41.000000000 +0000 nginx.error: {"time":"2019/01/22 08:06:41","log_level":"error","pid":"6","tid":"6","message":"*13 open() \"/usr/share/nginx/html/error\" failed (2: No such file or directory)","client":"172.17.0.1","server":"localhost","request":"\"GET /error HTTP/1.1\"","host":"\"localhost:8080\""}
2019-01-22 08:06:41.000000000 +0000 nginx.error: {"time":"2019/01/22 08:06:41","log_level":"error","pid":"6","tid":"6","message":"*13 open() \"/usr/share/nginx/html/error\" failed (2: No such file or directory)","client":"172.17.0.1","server":"localhost","request":"\"GET /error HTTP/1.1\"","host":"\"localhost:8080\""}
這麼一來我們進階設定的實驗也成功了。
結語
在這個資料如同石油的年代,如何建立穩定、可靠、容易管理的 Data Pipeline 是一件非常重要的事情,尤其是在架構日趨複雜的時候能使用統一的 Data Collector 就會幫我們省下大量的時間與心力。
希望這篇文章能對你有幫助,如果有什麼疑問、想知道什麼相關的東西等等的歡迎在底下留言。
(刪除線)如果喜歡這篇文章的話歡迎幫我按個「拍手」,想持續看到更多文章也可以按下「追隨」喔。(刪除線)
相關連結
程式在 GitHub LukeHong/fluentd-example
Image 在 Docker hub lukehong/nginx-fluentd-example