Introduction
If you're planning to setup Elasticsearch specifically for use with Halon and do not have previous experience with using it we recommend you read their getting started guide first as this article will not cover all the nuances with using this software. If you're interested in sending syslog messages to Elasticsearch, have a look at our Remote syslog to Logstash guide instead.
Prerequisites
- At least one server with Elasticsearch installed (Version >= 8.X)
- A server with Kibana installed (Version >= 8.X, Optional)
- The HTTPS port of your Elasticsearch cluster must be accessible from the Halon cluster, either with or without authentication enabled
Creating the index template
We'll begin by creating the index template that we're going to use in Elasticsearch. We're going to store the mail logs as time-based data and automatically create new indices after a certain time frame has passed.
We will configure the index template like this:
- The template pattern will match on indices that are created with a
halon-
prefix - Use one primary shard for each index
- Create one replica of each primary shard
- Assign mappings to some of the fields we'll be using (That otherwise would not automatically get the optimal type assigned to them)
Once you have verified that these are the settings you want to use you can run this command under the Dev Tools -> Console section in the Kibana web interface. If you do not have Kibana installed you can run the same command using for example curl as well.
PUT /_template/halon
{
"index_patterns": ["halon-*"],
"settings": {
"number_of_shards": 1,
"number_of_replicas": 1,
"analysis": {
"analyzer": {
"email_analyzer": {
"type": "custom",
"tokenizer": "uax_url_email",
"filter": ["lowercase", "stop"]
}
}
}
},
"mappings": {
"properties": {
"receivedtime": {
"type": "date",
"format": "epoch_millis"
},
"finishedtime": {
"type": "date",
"format": "epoch_millis"
},
"senderip": {
"type": "ip"
},
"serverip": {
"type": "ip"
},
"owner": {
"type": "text",
"analyzer": "email_analyzer"
},
"ownerdomain": {
"type": "text",
"analyzer": "email_analyzer"
},
"messageid": {
"type": "keyword"
}
}
}
}
If everything went well you should have received a response similar to this:
{ "acknowledged" : true }
Start logging
Now we're ready to setup the logging in Halon. The easiest way to get started is to use the provided implementation code which uses HTTP to send the mail logs in JSON format directly to Elasticsearch.
The implementation code consists of several HSL files, one of which is a settings file and the rest which are files that corresponds to different smtpd contexts available in Halon.
We recommend you use the name of the file as the ID when you create the files in the HSL editor and that you create them inside a folder called elastic
which you can do by prefixing the ID with elastic/
.
The settings file looks like this:
$elasticurl = "https://user:password@yourhostname.com:9200";
$indexname = "halon";
$indexrotate = "%Y-%m-%d";
The $elasticurl
variable should contain the URL to your Elasticsearch installation. If you do not use authentication in Elasticsearch you can omit the user:password@
part of the URL.
The $indexname
variable should match the prefix you used when creating the index template (Excluding the -*
part) and the $indexrotate
variable should match a strftime format which is what will be used as the time frame for when to automatically create a new index.
The last step to enable the logging (after adding all the files) is to include them in the corresponding contexts, for example like this at the top of the EOD rcpt context:
include "elastic/eodrcpt.hsl";
// $logdata["direction"] = "outbound"; // When used in outbound script
Note: If you already have overrides for some of the built-in functions such as Reject
and Defer
there will be a conflict when you include these files and you need to make sure you combine these overrides first before deploying the changes.
Searching the logs
If everything has been going well you should now start getting documents logged to your Elasticsearch installation. To search in them you could use curl or similar but we personally recommend you use Kibana for this as the web interface is much simpler to use.
Note: When you first define what index pattern to use in Kibana it's important that you use the same one as the template pattern you specified (halon-*
in our case) to be able to search through all the indices at the same time. It's also important to specify the receivedtime
field as the time filter field for the logs to have the correct date assigned to them.
Visualizing the logs
When you have your logs stored in Elasticsearch it's very simple to create various types of charts and reports in Kibana. Below is just a very quick example of a dashboard that we made that tries to mimic the built-in pie charts and area charts on the Halon.
Removing old indices
halon-%Y-%m-%d
). Run the action file with this command curator action_file_name.yml
. Tested with Curator 5.8.1.~/.curator/curator.yml
client: hosts: - 127.0.0.1 port: 9200 url_prefix: use_ssl: True certificate: client_cert: '[...].crt' client_key: '[...].key' ssl_no_validate: True
username: user
password: badpassword timeout: 30 master_only: False logging: loglevel: INFO logfile: logformat: default blacklist: ['elasticsearch', 'urllib3']
action_file_name.yml
actions: 1: action: delete_indices description: >- Delete indices older than 120 days (based on index name) options: ignore_empty_list: True disable_action: False filters: - filtertype: pattern kind: prefix value: halon- - filtertype: age source: name direction: older timestring: '%Y-%m-%d' unit: days unit_count: 120
Implementation code
Prerequisites
- The examples below uses the http-bulk plugin which needs to be installed.
rcptto.hsl
include "elastic/settings.hsl";
$logdata = [
"direction" => "inbound"
];
$httpoptions = [
"timeout" => 10,
"background" => true,
"background_hash" => hash($transaction["id"]),
"background_retry_count" => 1,
"background_retry_delay" => 1,
"tls_default_ca" => true,
"tls_verify_peer" => false,
"headers" => ["Content-Type: application/json"]
];
function sendlog($logdata = []) {
global $elasticurl, $httpoptions, $indexname, $indexrotate; // settings
global $transaction, $arguments, $connection;
if (!isset($logdata["owner"]))
$logdata["owner"] = ($logdata["direction"] == "outbound") ? $transaction["sender"] : $arguments["recipient"];
if (!isset($logdata["ownerdomain"]))
$logdata["ownerdomain"] = ($logdata["direction"] == "outbound") ? $transaction["senderaddress"]["domain"] : $arguments["address"]["domain"];
$time = time();
$logdata += [
"hostname" => gethostname(),
"messageid" => $transaction["id"],
"senderip" => $connection["remoteip"],
"senderport" => $connection["remoteport"],
"serverip" => $connection["localip"],
"serverport" => $connection["localport"],
"serverid" => "mailserver:".$connection["serverid"],
"tlsstarted" => isset($connection["tls"]),
"senderhelo" => $connection["helo"]["host"],
"saslauthed" => isset($connection["auth"]),
"saslusername" => $connection["auth"]["username"],
"sender" => $transaction["sender"],
"senderlocalpart" => $transaction["senderaddress"]["localpart"],
"senderdomain" => $transaction["senderaddress"]["domain"],
"senderparams" => $transaction["senderparams"],
"recipient" => $arguments["recipient"],
"recipientlocalpart" => $arguments["address"]["localpart"],
"recipientdomain" => $arguments["address"]["domain"],
"recipientparams" => $arguments["params"],
"transportid" => $arguments["transportid"],
"receivedtime" => round($time * 1000)
];
//$path = "/".$indexname."-".strftime($indexrotate, $time)."/_doc";
//http($elasticurl.$path, $httpoptions, [], json_encode($logdata));
$payload = json_encode($logdata);
if ($payload != none) {
http_bulk("elastic", json_encode(
[
"index" => [
"_index" => $indexname."-".strftime($indexrotate, $time)
]
])."\n".$payload);
} else {
echo "Logging to Elastic failed.";
}
}
function Reject(...$args) {
global $logdata;
$logdata["action"] = "REJECT";
$logdata["reason"] = isset($args[0]) ? $args[0] : "";
sendlog($logdata);
builtin Reject(...$args);
}
function Defer(...$args) {
global $logdata;
$logdata["action"] = "DEFER";
$logdata["reason"] = isset($args[0]) ? $args[0] : "";
sendlog($logdata);
builtin Defer(...$args);
}
eod.hsl
include "elastic/settings.hsl";
$httpoptions = [
"timeout" => 10,
"background" => true,
"background_hash" => hash($transaction["id"]),
"background_retry_count" => 1,
"background_retry_delay" => 1,
"tls_default_ca" => true,
"headers" => ["Content-Type: application/json"]
];
$logdata = [
"direction" => "inbound"
];
function sendlog($action, $reason, $recipient, $receivedtime, $actionid = none, $metadata = []) {
global $elasticurl, $httpoptions, $indexname, $indexrotate; // settings
global $transaction, $connection, $arguments; // eod
global $logdata;
$logdata += [
"action" => $action,
"reason" => $reason,
"owner" => $logdata["direction"] == "outbound" ? $transaction["sender"] : $recipient["recipient"],
"ownerdomain" => $logdata["direction"] == "outbound" ? $transaction["senderaddress"]["domain"] : $recipient["address"]["domain"],
"hostname" => gethostname(),
"messageid" => $transaction["id"],
"senderip" => $connection["remoteip"],
"senderport" => $connection["remoteport"],
"serverip" => $connection["localip"],
"serverport" => $connection["localport"],
"serverid" => $connection["serverid"],
"senderhelo" => $connection["helo"]["host"] ?? "",
"tlsstarted" => isset($connection["tls"]),
"saslusername" => $connection["auth"]["username"] ?? "",
"saslauthed" => isset($connection["auth"]),
"sender" => $transaction["sender"],
"senderlocalpart" => $transaction["senderaddress"]["localpart"],
"senderdomain" => $transaction["senderaddress"]["domain"],
"senderparams" => $transaction["senderparams"],
"recipient" => $recipient["recipient"],
"recipientlocalpart" => $recipient["address"]["localpart"],
"recipientdomain" => $recipient["address"]["domain"],
"transportid" => $recipient["transportid"],
"subject" => $arguments["mail"]->getHeader("subject"),
"size" => $arguments["mail"]->getSize(),
"receivedtime" => round($receivedtime * 1000),
"metadata" => $metadata
];
if ($actionid) $logdata["actionid"] = $actionid;
//$path = "/".$indexname."-".strftime($indexrotate, $receivedtime)."/_doc";
//if ($actionid) $path += "/".$transaction["id"].":".$actionid;
//http($elasticurl.$path, $httpoptions, [], json_encode($logdata));
$index = [
"index" => [
"_index" => $indexname."-".strftime($indexrotate, $receivedtime)
]
];
if ($actionid) $index["index"]["_id"] = $transaction["id"].":".$actionid;
$payload = json_encode($logdata);
if ($payload != none) {
http_bulk("elastic", json_encode($index)."\n".$payload);
} else {
echo "Logging to Elastic failed.";
}
}
function ScanRPD(...$args) {
global $logdata;
$outbound = $args[0]["outbound"] ?? false;
$logdata["score_rpd"] = builtin ScanRPD([ "outbound" => $outbound ]);
$logdata["score_rpd_refid"] = builtin ScanRPD([ "outbound" => $outbound, "refid" => true ]);
$logdata["score_rpdav"] = builtin ScanRPD([ "outbound" => $outbound, "extended_result" => true ])["virus_score"];
return builtin ScanRPD(...$args);
}
function ScanSA(...$args) {
global $logdata;
$logdata["scores"]["sa"] = builtin ScanSA();
$logdata["scores"]["sa_rules"] = builtin ScanSA(["rules" => true]);
return builtin ScanSA(...$args);
}
function ScanKAV(...$args) {
global $logdata;
$logdata["scores"]["kav"] = builtin ScanKAV() ? : "";
return builtin ScanKAV(...$args);
}
function ScanCLAM(...$args) {
global $logdata;
$logdata["scores"]["clam"] = builtin ScanCLAM() ? : "";
return builtin ScanCLAM(...$args);
}
function Defer(...$args) {
global $transaction, $metadata;
foreach ($transaction["recipients"] as $recipient) {
sendlog("DEFER", isset($args[0]) ? $args[0] : "", $recipient, time(), none, $metadata);
}
builtin Defer(...$args);
}
function Reject(...$args) {
global $transaction, $metadata;
foreach ($transaction["recipients"] as $recipient) {
sendlog("REJECT", isset($args[0]) ? $args[0] : "", $recipient, time(), none, $metadata);
}
builtin Reject(...$args);
}
/*
// EOD example
include "elastic/eod.hsl";
// Queue message for all recipients
foreach ($recipients as $recipient) {
$receivedtime = time();
$metadata["receivedtime"] = "$receivedtime";
$id = $mail->queue($sender, $recipient["address"], $recipient["transportid"], ["metadata" => $metadata]);
sendlog("QUEUE", "", $recipient, $receivedtime, $id["queue"], $metadata);
}
*/
post.hsl
include "elastic/settings.hsl";
$httpoptions = [
"timeout" => 10,
"background" => true,
"background_hash" => hash($message["id"]["transaction"]),
"background_retry_count" => 1,
"background_retry_delay" => 1,
"tls_default_ca" => true,
"headers" => ["Content-Type: application/json"]
];
function sendlog() {
global $elasticurl, $httpoptions, $indexname, $indexrotate; // settings
global $message, $arguments;
$receivedtime = GetMetaData()["receivedtime"];
$time = time();
$logdata["doc"] = [
"action" => $arguments["action"] ?? "DELIVER",
"reason" => $arguments["attempt"]["result"]["reason"][0] ?? $arguments["attempt"]["error"]["message"] ?? "",
"queue" => [
"action" => $arguments["action"] ?? "DELIVER",
"retry" => $arguments["retry"],
"errormsg" => $arguments["attempt"]["result"]["reason"][0] ?? $arguments["attempt"]["error"]["message"] ?? "",
"errorcode" => $arguments["attempt"]["result"]["code"],
"transfertime" => $arguments["attempt"]["duration"]
],
"sender" => $message["sender"],
"senderdomain" => $message["senderaddress"]["domain"],
"recipient" => $message["recipient"],
"recipientdomain" => $message["recipientaddress"]["domain"],
"transportid" => $message["transportid"],
"finishedtime" => round($time * 1000)
];
//$path = "/".$indexname."-".strftime($indexrotate, $receivedtime)."/_update/".$message["id"]["transaction"].":".$message["id"]["queue"];
//http($elasticurl.$path, $httpoptions, [], json_encode($logdata));
$payload = json_encode($logdata);
if ($payload != none) {
http_bulk("elastic", json_encode(
[
"update" => [
"_id" => $message["id"]["transaction"].":".$message["id"]["queue"],
"_index" => $indexname."-".strftime($indexrotate, $receivedtime)
]
])."\n".$payload);
} else {
echo "Logging to Elastic failed.";
}
}
sendlog();
settings.hsl
$elasticurl = "https://user:password@yourhostname.com:9200";
$indexname = "halon";
$indexrotate = "%Y-%m-%d";
Comments
0 comments
Article is closed for comments.