Splunk Getting Extreme Part Three

We covered an example of an Anomalous Driven (AD) context in part one and how to use tstats in part two. We will cover the a traditional Domain type context example using Authentication data and tstats.

In XS commands DD mean Data Driven context. Here we will cover a use case using xsCreateDDContext of the type Domain. Using type=domain means we are going to need a count, max, mix. The terms we will use are minimal, low, medium, high, and extreme. This will let us find certain levels of activity without worrying about what “normal” is vs “anomalous” as we saw in part one.

Extreme Search Commands:

xsCreate

The Create method tells extreme search to create the container and populate or update all the classes if the container already exists. You have to use this if the container does not already exist.

xsUpdate

This functions exactly as the xsCreate except that it will NOT work if the container does not exist. It will return an error and stop.

xsDeleteContext

This will delete a SPECIFIC class or “all” if no class is specified from a context in a container. There is no XS command to actually remove the contents from the container. Deleting against a context/container without a class leaves all the class data but searching against the context will act as if it does not exist. The deletion without a class removed the default class lines. From there XS commands act as if the context is gone though most of the class data remains. This means the file exists with most of it’s file size intact. There is not even an XS command to remove an entire container. We can still cheat from within Splunk. Normally, you should NEVER touch the context files via the outputlookup command as it will often corrupt the file contents. If we want to empty a container file we can just overwrite the csv file with empty contents. The CSV file name will be in the format: containername.context.csv

If we had made a context with:
| xsupdateddcontext name=mytest container=mytestContainer app=search scope=app class=src terms=terms="minimal,low,medium,high,extreme"

We can nuke the contents of the file using the search:
makeresults | outputlookup mytestContainer.context.csv

We can now populate that container with either xsCreate or xsUpdate. xsUpdate will work since the container file exists. This trick can be handy to reset a container and cull out accumulated data because the file has grown very large over time with use or if you accidentally fed too much data into it.

Let’s talk about that for a minute. What is too large? XS has to read in the entire CSV into memory when it uses it. That has the obvious implications. A data set of 10 rows with the normal 5 domain terms of “minimal,low,medium,high,extreme” gives us 56 lines in the csv. 10 data items plus a default data item = 11 * 5 = 55 plus a header row = 56. Generally, if you are going to have 10K data items going into a context I would make one container for it and not share that container with any other contexts. That way you are not reading in a lot of large data into memory you are not using with your XS commands like for xswhere filtering.

One other thing to consider. The data size of this file is important in the Splunk data bundle replication. It is a csv file in the lookups folder and gets distributed with all the other data. If you made a context so large the CSV was 1.5GB in size you could negatively impact your search bundle replication and be in for the fun that brings.

xsFindBestConcept

This command comes from the Extreme Search Visualization app. It lets you run data against your context and have it tell you what concept terms best match the each result. This command has to work pretty hard so if your data going in is large it may take a few minutes to come back.

| tstats summariesonly=true dc(Authentication.user) as userCount from datamodel=Authentication where (nodename=Authentication.Failed_Authentication sourcetype=linux_secure) by _time, Authentication.src, Authentication.app span=1d | rename Authentication.* AS * | xsFindBestConcept userCount FROM users_by_src_1d IN auth_failures BY "src,app"

xsGetWhereCIX

This command comes acts like xswhere but does not actually filter results. It just displays ALL results that went in and what their CIX compatibility value is for the statement you used.

| tstats summariesonly=true count as failures, dc(Authentication.user) AS userCount from datamodel=Authentication where nodename=Authentication.Failed_Authentication by _time Authentication.src, Authentication.app span=1d | eval avgFailures=failures/userCount | rename Authentication.* AS * | xsgetwherecix avgFailures from failures_by_src_1d by "src,app" in auth_failures is extreme

Min and Max:

XS for the type=domain needs count, and min/max values with depth. This means where min/max are never equal. The fun part is HOW you get a min and max is up to you. You will see examples that just use the min() and max() functions. Other examples will get min() and make max the median()*someValue. You often have to experiment for what fits your data and gives you an acceptable result. We touched on this value spreading in part one of Getting Extreme.

Here a couple of different patterns though you can do it any way you like.

  1. stats min(count) as min, max(count) as max … | eval max=if(min=max,min+5,max) | eval max=if(max-min<5,min+5,max)

  2. stats min(count) as min, median(count) as median, average(count) as average … | eval median=if(average-median<5,median+5,average) | eval max=median*2

If you don’t get min/max spread out you will see a message like the following when trying to generate your context.

xsCreateDDContext-W-121: For a domain context failures_by_src_1d with class 103.207.36.133:sshd, min must be less than max, skipping

Use Case: Authentication Abusive Source IPs

Question: We will define our question as, what are the source IPs that are abusing our system via authentication failures by src and application type. We want to know by average failures/number of user accounts tried per day. We also want to know if it is simply an extreme number of user accounts failed regardless of the number of failures per day. Yeah, normally I would do by hour or shorter period. The test data I have is from a Raspberry Pi exposed to the Internet. The RPi is sending to Splunk using the UF for Raspberry PI. That RPi is also running fail2ban, so it limits the number of failures a source can cause before it is banned for a while. This means we will work with a scale that typically maxes out at 6 tries.

Avg Failures/userCount by src by day

Here we divide the number of failures by the number of users. This gives us a ball park number of failures for a user account from a given source. We could put user into the class but that would then make our trend too specific of being tied to a distinct src, app,user. We want more a threshold of failures per user per source in a day.

Context Gen:

| tstats summariesonly=true count as failures, dc(Authentication.user) AS userCount from datamodel=Authentication where nodename=Authentication.Failed_Authentication by _time Authentication.src, Authentication.app span=1d | eval avgFailures=failures/userCount | stats count, avg(avgFailures) as average, min(avgFailures) as min, max(avgFailures) as max by Authentication.src, Authentication.app | rename Authentication.* AS * | eval max=if(min=max,min+5,max) | xsCreateDDContext name=failures_by_src_1d app=search container=auth_failures scope=app type=domain terms="minimal,low,medium,high,extreme" notes="login failures by src by day" uom="failures" class="src,app"

Search:

Here we use the context to filter our data and find the extreme sources.

| tstats summariesonly=true count as failures, dc(Authentication.user) AS userCount from datamodel=Authentication where nodename=Authentication.Failed_Authentication by time Authentication.src, Authentication.app span=1d | eval avgFailures=failures/userCount | rename Authentication.* AS * | xswhere avgFailures from failures_by_src_1d by "src,app" in auth_failures is extreme | iplocation prefix=src src | rename src_City AS src_city, src_Country AS src_country, src_Region as src_region, src_lon AS src_long | lookup dnslookup clientip AS src OUTPUT clienthost AS src_dns

Distinct User Count by src by day

Here we are going to trend the distinct number of users tried per source without regard of the number of actual failures.

Context Gen:

| tstats summariesonly=true dc(Authentication.user) as userCount from datamodel=Authentication where (nodename=Authentication.Failed_Authentication sourcetype=linux_secure) by _time, Authentication.src, Authentication.app span=1d | stats min(userCount) as min, max(userCount) as max, count by Authentication.src, Authentication.app | rename Authentication.* as * | eval max=if(min=max,min+5,max) | xsCreateDDContext name=users_by_src_1d app=search container=auth_failures scope=app type=domain terms="minimal,low,medium,high,extreme" notes="user count failures by src by day" uom="users" class="src,app"

Search:

Here we use the context to filter our data and find the sources with user counts above medium.

| tstats summariesonly=true dc(Authentication.user) as userCount from datamodel=Authentication where (nodename=Authentication.Failed_Authentication sourcetype=linux_secure) by _time, Authentication.src, Authentication.app span=1d | rename Authentication.* AS * | xswhere userCount from users_by_src_1d in auth_failures by "src,app" is above medium

Merge to get the most abusive sources by app

We can actually merge both of these searches together. This lets us run one search over a give time period reducing our Splunk resource usage and giving us results that match either or both of our conditions.

Combined Search:

This search is bucketing the time range it runs across into days then compares to our contexts that were generated with day period as a target. Normally for an ES notable search you would not bucket time with the “by” and “span” portions as you would be only running the search over something like the previous day each day.

| tstats summariesonly=true count AS failures, dc(Authentication.user) as userCount, values(Authentication.user) as targetedUsers, values(Authentication.tag) as tag, values(sourcetype) as orig_sourcetype, values(source) as source, values(host) as host from datamodel=Authentication where (nodename=Authentication.Failed_Authentication sourcetype=linux_secure) by time, Authentication.src, Authentication.app span=1d | eval avgFailures=failures/userCount | rename Authentication.* AS * | xswhere avgFailures from failures_by_src_1d by "src,app" in auth_failures is extreme OR userCount from users_by_src_1d in auth_failures by "src,app" is above medium | iplocation prefix=src src | rename src_City AS src_city, src_Country AS src_country, src_Region as src_region, src_lon AS src_long | lookup dnslookup clientip AS src OUTPUT clienthost AS src_dns

The thing to note about the CIX value is anything that is greater than 0.5 means it matched both our contexts to some degree. The 1.0 matched them both solidly. If the CIX is 0.5 or less it means it matches only one of the contexts to some degree. Notice, I used “is extreme” on one test and “is above medium” on the other. You can adjust the statements to fit your use case and data.

Bonus Comments:

You will notice in the searches above I added some iplocation and dnslookup commands. I also used the values and extra eval functions to add to the field value content of the results. This is something you want to do when making Enterprise Security notables. This helps give your security analysts data robust notables that they might can triage without ever drilling down into the original event data.

Share