Usage#
Note
This page is under active development.
Getting Started#
To use HAMSTRING, just use the provided docker-compose.yml to quickly bootstrap your environment:
$ HOST_IP=127.0.0.1 docker compose -f docker/docker-compose.yml up
If you want to run containers individually, use:
$ HOST_IP=127.0.0.1 docker compose -f docker/docker-compose.kafka.yml up
$ docker run ...
Make sure you set the environment variable HOST_IP to your host’s IP address, so that the services can communicate with each other.
Scaling With Docker Compose#
HAMSTRING has two scaling axes:
Docker Compose replicas start more containers for a service.
pipeline.scalinginconfig.yamlstarts more workers inside each service container.
Use Docker Compose replicas when you want horizontal service scaling across containers. For the production profile, scale the production service names:
$ HOST_IP=127.0.0.1 docker compose -f docker/docker-compose.yml --profile prod up --scale logcollector=3 --scale detector=2
For the development profile, scale the -dev service names:
$ HOST_IP=127.0.0.1 docker compose -f docker/docker-compose.yml --profile dev up --scale logcollector-dev=3 --scale detector-dev=2
When using Compose replicas, set NUMBER_OF_INSTANCES for the scaled service to the same replica count so
Kafka topic creation can request enough partitions for the whole consumer group:
services:
detector:
environment:
- GROUP_ID=data_analysis
- NUMBER_OF_INSTANCES=2
The compose fragments also contain deploy.replicas fields. Use them for orchestrators that honor Compose
deploy settings; for local docker compose up runs, the explicit --scale flag is the clearest option.
For worker scaling inside a container, configure pipeline.scaling. For example, this starts two detector
processes with four worker threads each in every detector container:
pipeline:
scaling:
modules:
data_analysis.detector:
executor: hybrid
processes: 2
threads_per_process: 4
With --scale detector=2, that configuration creates 2 Docker replicas * 2 processes * 4 threads:
16 Kafka consumers for the detector stage. See Getting Started for the full scaling option reference and
per-instance override examples.
Installation#
Install all Python requirements.
$ python -m venv .venv
$ source .venv/bin/activate
(.venv) $ sh install_requirements.sh
Now, you can start each module, e.g. the Inspector:
(.venv) $ python src/inspector/main.py
Configuration#
Logline format configuration#
If a wants to add a new inspector or detector, it might be necessary to adapt the logline formats, if the preexisting ones do not contain the needed information.
To do so, one can adapt or define logcollector formats in the main configuration file (config.yaml) under pipeline.log_collection.collectors.[collector_name].required_log_information.
Adding a new logcollector enables prefilters (and later on onspectors and detectors) to consume from a new Kafka topic.
Currently, we support timestamps, IP addresses, regular expressions, and list-based validation for data fields in a logline. For example, a logline for the DNS protocol might look like this:
2025-04-04T14:45:32.458123Z NXDOMAIN 192.168.3.152 10.10.0.3 test.com AAAA 192.168.15.34 196b
Field Definition Structure#
Each list entry of the parameter defines one field of the input logline, and the order of the entries corresponds to the order of the values in each logline. Each list entry itself consists of a list with two to four entries depending on the field type. For example, a field definition might look like this:
[ "status_code", ListItem, [ "NOERROR", "NXDOMAIN" ], [ "NXDOMAIN" ] ]
Field Names and Requirements#
The first entry of each field definition always corresponds to the name of the field. Certain field names are required for proper pipeline operation, while others are forbidden as they are reserved for internal use.
Required |
|
|---|---|
Forbidden |
|
Required fields must be present in the configuration as they are essential for pipeline processing. Forbidden fields are reserved for internal communication and cannot be used as custom field names.
Field Types and Validation#
The second entry specifies the type of the field. Depending on the type defined, the method for defining validation parameters varies. The third and fourth entries change depending on the type.
There are four field types available:
Field type |
Format of 3rd entry |
Format of 4th entry |
Description |
|---|---|---|---|
|
Timestamp format string |
(not used) |
Validates timestamp fields using Python’s strptime format. Automatically converts to ISO format for internal processing.
Example: |
|
(not used) |
(not used) |
Validates IPv4 and IPv6 addresses. No additional parameters required. |
|
RegEx pattern as string |
(not used) |
Validates field content against a regular expression pattern. If the pattern matches, the field is valid. |
|
List of allowed values |
List of relevant values (optional) |
Validates field values against an allowed list. Optionally defines relevant values for filtering in later pipeline stages. All relevant values must also be in the allowed list. If not specified, all allowed values are deemed relevant. |
Configuration Examples#
Here are examples for each field type:
logline_format:
- [ "timestamp", Timestamp, "%Y-%m-%dT%H:%M:%S.%fZ" ]
- [ "status_code", ListItem, [ "NOERROR", "NXDOMAIN" ], [ "NXDOMAIN" ] ]
- [ "client_ip", IpAddress ]
- [ "domain_name", RegEx, '^(?=.{1,253}$)((?!-)[A-Za-z0-9-]{1,63}(?<!-)\.)' ]
- [ "record_type", ListItem, [ "A", "AAAA" ] ]
Logging Configuration#
The following parameters control the logging behavior.
Parameter |
Description |
|---|---|
base |
The |
modules |
For each module, the |
If a debug field is set to false, only info-level logging is shown. By default, all the fields are set to false.
Pipeline Configuration#
The following parameters control the behavior of each stage of the HAMSTRING pipeline, including the functionality of the modules.
pipeline.scaling#
Controls how many independent workers each pipeline module starts. Each worker owns its own Kafka
consumer and producer, so workers consuming the same topic join the same Kafka consumer group and can
process different partitions in parallel. Values from defaults apply to every module and can be
overridden per module under modules. Modules that run several configured instances can also override
an individual instance under instances.
Scaling is resolved in this order:
pipeline.scaling.defaultspipeline.scaling.modules.<module-name>pipeline.scaling.modules.<module-name>.instances.<instance-name>
The instance names are the configured pipeline object names, not Docker service names. For example,
log_collection.collector.instances.dga_collector applies only to the collector whose
pipeline.log_collection.collectors[].name is dga_collector.
pipeline:
scaling:
defaults:
executor: thread
max_workers: 1
modules:
log_collection.collector:
executor: thread
max_workers: 2
instances:
dga_collector:
threads: 4
data_analysis.detector:
executor: process
processes: 2
pipeline.alerter:
executor: hybrid
processes: 2
threads_per_process: 4
Parameter |
Default |
Description |
|---|---|---|
|
|
Worker model. Valid values are |
|
|
Number of thread workers for |
|
|
Number of thread workers inside each process for |
|
|
Number of worker processes for |
|
|
Backwards-compatible worker-count alias. For |
|
|
Alias for |
|
none |
Per-configured-instance overrides. The nested keys must match the instance names listed below. |
thread mode starts threads independent workers in the service process. process mode starts
processes worker processes with one worker each. hybrid mode starts processes processes with
threads_per_process worker threads inside each process.
If executor is omitted, HAMSTRING infers it from the worker-count keys:
threadsonly:threadprocessesonly:processprocessesandthreadsorthreads_per_process:hybrid
For example, this starts two processes with four Kafka-consuming workers in each process:
pipeline:
scaling:
modules:
data_analysis.detector:
executor: hybrid
processes: 2
threads_per_process: 4
This is equivalent, because threads is an alias for threads_per_process in hybrid mode:
pipeline:
scaling:
modules:
data_analysis.detector:
processes: 2
threads: 4
Per-instance overrides are useful when one configured stage is more expensive than another. This example
uses hybrid mode for all log collectors, but gives the dga_collector fewer workers and the
domainator_collector pure process workers:
pipeline:
scaling:
modules:
log_collection.collector:
executor: hybrid
processes: 2
threads_per_process: 4
instances:
dga_collector:
processes: 1
threads_per_process: 2
domainator_collector:
executor: process
processes: 3
The effective number of Kafka consumers for one configured pipeline instance is:
Docker service replicas * processes * threads_per_process
For thread mode, processes is 1. For pure process mode, threads_per_process is 1.
The consumed Kafka topic needs at least that many partitions to keep every worker busy. HAMSTRING requests
at least the local worker count when creating or expanding topics; set NUMBER_OF_INSTANCES on the
service when Docker Compose replicas are used so topic creation can account for the replica count as well.
Module key |
Instance keys |
Example |
|---|---|---|
|
Full consumed input topic name. Without an instance override, the module setting applies to every logserver protocol topic. |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
No per-instance key by default. |
Configure the module key directly. |
Docker Compose service replicas are configured separately from pipeline.scaling. Compose replicas add
more containers; pipeline.scaling adds more workers inside each container. Both forms of scaling use the
same Kafka consumer group for the same stage/topic.
For local Docker Compose runs, scale services with docker compose up --scale:
$ HOST_IP=127.0.0.1 docker compose -f docker/docker-compose.yml --profile prod up --scale logcollector=3 --scale detector=2
For the development profile, use the -dev service names from docker/docker-compose.yml:
$ HOST_IP=127.0.0.1 docker compose -f docker/docker-compose.yml --profile dev up --scale logcollector-dev=3 --scale detector-dev=2
The compose fragments under docker/docker-compose/dev and docker/docker-compose/prod also contain
deploy.replicas fields. Those fields document the intended replica count and are used by orchestrators
that honor Compose deploy settings. For portable local Compose usage, prefer the explicit --scale
flag and keep NUMBER_OF_INSTANCES aligned with the replica count:
services:
detector:
environment:
- GROUP_ID=data_analysis
- NUMBER_OF_INSTANCES=2
$ HOST_IP=127.0.0.1 docker compose -f docker/docker-compose.yml --profile prod up --scale detector=2
With this example and the hybrid detector config shown above, the detector starts
2 Docker replicas * 2 processes * 4 threads_per_process = 16 Kafka consumers.
pipeline.log_storage#
Parameter |
Default Value |
Description |
|---|---|---|
input_file |
|
Path of the input file, to which data is appended during usage. Keep this setting unchanged when using Docker; modify the |
pipeline.log_collection#
Parameter |
Description |
|---|---|
name |
A unique name amongst the |
protocol_base |
The lowercase protocol name to ingest data from. Currently supported: |
required_log_information |
Defines the expected format for incoming log lines. See the Logline format configuration page for more details. |
Each log_collector has a BatchHandler instance. Default confgurations for all Batch handlers are defined in pipeline.log_collection.default_batch_handler_config.
You can override these values for each logcollector instance by adjusting the values inside the pipeline.log_collection.collectors.[collector_instance].batch_handler_config_override.
The following list shows the available configuration options.
Parameter |
Default Value |
Description |
|---|---|---|
batch_size |
|
Number of entries in a Batch, at which it is sent due to reaching the maximum fill state. |
batch_timeout |
|
Time after which a Batch is sent. Mainly relevant for Batches that only contain a small number of entries, and do not reach the size limit for a longer time period. |
subnet_id.ipv4_prefix_length |
|
The number of bits to trim from the client’s IPv4 address for use as Subnet ID. |
subnet_id.ipv6_prefix_length |
|
The number of bits to trim from the client’s IPv6 address for use as Subnet ID. |
pipeline.log_filtering#
Parameter |
Description |
|---|---|
name |
A unique name amongst the prefilter configurations top identify the prefitler instance. |
relevance_method |
The name of the method used to to check if a given logline is relevant for further inspection.
This check can be skipped by choosing |
collector_name |
The name of the collector configuration the prefilter consumes data from. The same collector name can be referenced in multiple prefilter configurations. |
pipeline.data_inspection#
Parameter |
Description |
|---|---|
name |
A unique name amongst the inspector configurations top identify the inspector instance. |
prefilter_name |
The name of the prefitler configuration the inspector consumes data from. The same prefilter name can be referenced in multiple inspector configurations. |
inspector_module_name |
Name of the python file in |
inspector_class_name |
Name of the class inside the |
Inspectors can be added easily by implementing the base class for an inspector. More information is available at Overview. Each inspector might be needing additional configurations. These are also documented at Overview.
To entirely skip the anomaly detection phase, you can set inspector_module_name: "no_inspector" and inspector_class_name: "NoInspector".
pipeline.data_analysis#
Parameter |
Default Value |
Description |
|---|---|---|
name |
A unique name amongst the detector configurations top identify the detector instance. |
|
inspector_name |
The name of the inspector configuration the detector consumes data from. The same inspector name can be referenced in multiple detector configurations. Omit this or set |
|
consume_from |
|
Set to |
detector_module_name |
Name of the python file in |
|
detector_class_name |
Name of the class inside the |
|
model |
|
Model to use for the detector |
checksum |
Not given here |
Checksum for the model file to ensure integrity |
base_url |
Base URL for downloading the model if not present locally |
|
threshold |
|
Threshold for the detector’s classification. |
produce_topics |
|
(Optional) Comma-separated list of alerter topic suffixes to produce alerts to. If left empty, defaults to the |
next_detectors |
|
(Optional) Comma-separated list of detector instance names that should receive this detector’s suspicious output on detector-to-detector topics. |
send_to_alerter |
|
Set to |
pipeline.alerting#
Parameter |
Default Value |
Description |
|---|---|---|
log_to_file |
|
Boolean flag to enable/disable logging of alerts to a local file. |
log_to_kafka |
|
Boolean flag to enable/disable forwarding of alerts to an external Kafka topic. |
log_file_path |
|
Local file path where alerts will be appended if |
external_kafka_topic |
|
Name of the external Kafka topic where alerts will be sent if |
plugins |
|
List of custom alerter plugins to execute. Each plugin must specify |
pipeline.zeek#
To configure the Zeek sensors to ingest data, an entry in ther pipeline.zeek.sensors must be adapted or added.
Each of the configured sensores is meant to run on a different machine or network interface to collect data.
Each instance configured needs to be setup using the docker-compose.yaml. The dictionary name needs to exactly correspond with the
name of the instance configured there.
Each sensore has the following configuration parameters:
Parameter |
Description |
|---|---|
static_analysis |
A bool to indicate whether or not a static analysis should be executed. If |
protocols |
List of lowercase names of protocols the Zeek sensor should be monitoring and sending in the Kafka Queues. Currently supported: |
interfaces |
List of network interface names for a network analysis to monitor. As the Zeek containers run in |
Environment Configuration#
The following parameters control the infrastructure of the software.
Parameter |
Default Value |
Description |
|---|---|---|
kafka_brokers |
|
Hostnames and ports of the Kafka brokers, given as list. The node ip is crucial and needs to be set to the actual IP of the system where the Kafka broker will be running on. |
kafka_topics_prefix |
Not given here |
Kafka topic name prefixes given as strings. These prefix name are used to construct the actual topic names based on the instance name (e.g. a collector instance name) that produces for the given stage. (e.g. a prefilter instance name is added as suffix to the prefilter_to_inspector prefix for the inspector to know where to consume.) |
kafka_consumer.max_poll_interval_ms |
|
Maximum time in milliseconds between Kafka consumer polls before Kafka removes the consumer from its group. Increase this for long-running detector batches. |
kafka_topics.replication_factor |
|
Replication factor used when creating new Kafka topics. At runtime this is capped to the number of configured Kafka brokers. |
kafka_topics.auto_expand_partitions |
|
If enabled, existing HAMSTRING topics with fewer than the desired partition count are automatically expanded on consumer startup. Kafka does not support shrinking partition counts, so topics that are already larger are left unchanged. |
kafka_topics.stages |
See |
Per-pipeline-stage topic settings. Keys match |
kafka_topics.topics |
See |
Exact per-topic settings for topics that are not represented by a pipeline prefix, for example external alert topics. Topics without a stage or exact entry use 12 partitions and the default replication factor. |
monitoring.clickhouse_server.hostname |
|
Hostname of the ClickHouse server. Used by Grafana. |