카테고리 없음

logstash 시작

OKKY 2020. 4. 16. 18:11
input {
	aggregation {
    	hosts => "host:port"
        index => "keyword_index"
        type => "keyword_qc"
        schedule => "10 4 * * *"
        aggs_name => "keyword_cnt"
        docinfo => true
        query_filepath => "/etc/logstash/keyword_cnt.es"
        parameters => [
        	{ "_day_" => "1" "_size_" => "1234" "_bool_" => "true" "_astric_" => "*" },
        	{ "_day_" => "2" "_size_" => "2345" "_bool_" => "false" "_astric_" => "*" }
        ]
        result_has_parameters => true
    }
    
    jdbc {
    	connection_retry_attempts => 3
        connection_retry_attempts_wait_time => 1
        jdbc_driver_library => "/etc/logstash/lib/mysql-connector-java-version.jar"
        jdbc_driver_class => "com.mysql.jdbc.Driver"
        jdbc_validate_connection => false
        jdbc_connection_string => "jdbc:mysql://host:port/db"
        jdbc_user => "simon"
        jdbc_password => "simon_work1004"
        jdbc_fetch_size => 1000
        statement_filepath => "/etc/logstash/keyword_rank.sql"
        type => "keyword_buy"
        schedule => "30 2 * * *"
        last_run_metadata_path=>"/etc/logstash/.logstash_jdbc_last_run"

    }
}

filter {
	if [type] == "keyword" {
    	 ruby {
            code => "
                from = event.get('parameters')['_day_'].to_i
                dateTime = (DateTime.new(Time.now.year, Time.now.month, Time.now.day, 0, 0, 0, 0) - from)
                date = dateTime.strftime('%s')
                event.set('date_index', dateTime.strftime('%y%m'))
                id = date + '_' + [event.get('key')].pack('m0')
                event.set('searchKeyword', event.get('key'))
                event.set('keyword', event.get('key').strip.upcase)
                event.set('date', date)
                event.set('id', id)

                platform = event.get('parameters')['_platform_']

                if platform == '*'
                    platform = 'total'
                end

                obj = Hash.new
                obj['keyword1_cnt'] = event.get('keyword2_cnt')

                event.set(platform, obj)
            "
        }

        mutate {
            replace => {
                "[@metadata][_index]" => "keyword_%{date_index}"
                "[@metadata][_type]" => "keyword"
                "[@metadata][_id]" => "%{[id]}"
                "[tags]" => "%{[type]}"
            }
            remove_field => [ "@timestamp", "@version", "type", "doc_count", "id", "parameters", "date_index", "key" ]
        }

        prune {
            whitelist_names => [ 'web', 'app', 'total', 'date', 'keyword', 'tags' ]
        }

    }
    else if [type] == "keyword2" {
    
    }
}

output {
	if "keyword" in [tags] or "keyword2" in [tags] {
    	elasticsearch {
        	action => "update"
            hosts => 
            index => "%{[@metadata][_index]}"
            document_type => "~"
            document_id => "~"
            doc_as_upsert => true
            template_name => "keyword_temp"
            template => "/etc/logstash/template/keyword_temp.json"
            template_overwrite => true
            retry_on_conflict => 5
        }
    }
}