Logstash作為一個強大的數據收集和轉發工具,可以從各種來源(如Kafka、數據庫等)獲取日志數據,并通過靈活的過濾器對數據進行清洗、格式轉換和增強。經過處理后的數據可以被發送到多個目標系統,如Elasticsearch、OpenSearch或者其他數據庫和分析平臺,從而幫助企業在更短的時間內獲取、分析和展示數據。這種處理方式對于日志分析、監控、事件追蹤等場景尤為重要。
這里,我們以一個示范的例子讀取Kafka中的數據轉換后寫入到Elasticsearch,向您簡單介紹天翼云Logstash實例如何幫助企業實現高效的日志數據流處理。
前提條件
已在同VPC下開通Kafka服務和Elasticsearch實例。
獲取相應內網地址和端口,例如192.168.XX.XX:9200。
操作步驟
創建Logstash實例
在云搜索服務實例創建頁面選擇已經開通好的Elasticsearch實例,點擊進入Elasticsearch實例,選擇Logstash加裝。
訂購周期、當前區域(即資源池)、可用區、填寫實例名稱、節點規格等基礎配置信息后,按頁面提示并根據需要進行配置,然后點擊下一步。
按頁面提示,勾選相關協議,完成訂單付款,即可完成訂購,等待資源開通完成,對應實例處于“運行中”狀態,即為開通成功。
配置處理任務
在云搜索服務實例創建頁面選擇Logstash實例管理,選擇已經加裝的Logstash實例。
選擇“管道管理”,選擇“新建管道”。
在“管道配置”窗口中,配置相關的管道配置。
input {
kafka {
auto_offset_reset => "latest"
bootstrap_servers => "ip:port,ip:port,ip:port"
consumer_threads => 88
group_id => "your_group_id"
topics => ["your_kafka_topic"]
codec => json
}
}
filter {
}
output {
elasticsearch {
hosts => ["ip:port"]
index => "your_index_name-%{+YYYY.MM.dd}"
user => "your_username"
password => "your_password"
}
}
input部分用于定義數據的來源,即Logstash從哪里獲取數據。這里以Kafka為例:
auto_offset_reset:會決定從哪個位置開始消費消息
bootstrap_servers:Kafka集群的地址
consumer_threads:控制 Kafka 消費者線程的數量
group_id:消費者組ID
topics:Kafka的主題列表
codec:指定數據格式
filter部分可以進行消息處理、數據解析等操作。根據實際業務規則來處理。
output部分用于將處理后的數據推送到外部系統(如數據庫、消息隊列、搜索引擎等)。這里以Elasticsearch為例:
hosts:指定Elasticsearch的地址。
index:指定存儲到Elasticsearch的索引名稱,可以使用日期變量%{+YYYY.MM.dd} 來動態生成基于日期的索引。
user/password:提供Elasticsearch用戶名和密碼
配置完點擊“預校驗”,預校驗會使用Logstash的語法功能來檢測基礎的語法,不檢測邏輯和連通性。
預校驗通過后可以執行后續操作。
在“參數配置”中,配置相關的參數。
| 參數 | 說明 |
|---|---|
| 管道名稱 | 只能包含字母、數字、中劃線或下劃線,且必須以字母開頭,限制4-30個字符,實例內唯一 |
| 管道描述 | 限制長度50 |
| 管道工作線程數 | 控制Logstash同時處理事件的線程數 |
| 管道批處理大小 | 單個線程從input部分收集的最大事件數 |
| 管道批處理延遲 | 不滿足批處理大小最大等待時間 |
| 隊列類型 | memory模式、persisted模式 memory:內存隊列 persisted:磁盤隊列 |
| 隊列最大字節數 | persisted模式生效 隊列最大存儲大小 |
| 隊列檢查點寫入數 | persisted模式生效 隊列檢查點數量 |
啟動處理任務
配置完參數后點擊“保存并部署”。就啟動了讀取Kafka中的數據轉換后寫入到Elasticsearch的管道任務。
等到管道“運行中”可以嘗試配置的Kafka的topics中導入一些數據,例如使用Filebeat或其他程序。
可以通過Curl命令查看logstash中配置的索引名,在Elasticsearch實例是否有數據進來。
curl "//<host>:<port>/_cat/indices"停止任務
當任務執行結束后,可以在“管道管理”頁面,點擊“停止”,會執行停止管道操作。
刪除管道
當管道處于停止狀態,如后續不再需要執行該任務,點擊“刪除”,可刪除該管道的信息。
刪除實例
當任務結束,無需使用加裝的Logstash時,可以選擇退訂實例,刪除前保證正在運行的管道都已經停止。
在此執行的退訂功能僅退訂Logstash實例,Elasticsearch實例退訂在Elasticsearch實例管理退訂。