WORKFLOW AUTHORING

Write your own
security workflow.

Any alert, any response chain. Drop a YAML file — CipherRun executes every step: HTTP calls, LLM reasoning, conditional branching, Slack notifications. No code, no containers.

What a workflow looks like

Every workflow is a single YAML file with four top-level blocks. The engine reads the file, resolves template variables, and executes each step in order — pausing on conditional branches and routing accordingly.

phishing-triage.yaml
name: Phishing Email Triage
version: "1.0"
trigger: email_received
description: >
  Extract IOCs, classify with GPT-4o-mini,
  score 0-100, route to quarantine or monitor.

inputs:
  - name: sender
    description: "From address"
  - name: subject
  - name: body
  - name: headers

steps:

  # Extract URLs from the email body
  - id: extract_urls
    type: regex.extract
    with:
      input:   "{{ inputs.body }}"
      pattern: "https?://[^\s]+"
      global:  true

  # LLM classification
  - id: analyze_intent
    type: llm.analyze
    with:
      model:       gpt-4o-mini
      json_output: true
      prompt:      "Classify this email: {{ inputs.body }}"

  # Score 0-100 from signals
  - id: score
    type: transform
    with:
      compute:
        final_score: "clamp(ioc + llm, 0, 100)"

  # Branch: quarantine if score >= 70
  - id: route
    type: conditional
    with:
      if: "steps.score.output.final_score >= 70"

  - id: quarantine
    type: output
    if: "steps.route.output.result == true"
    with:
      value:
        action: quarantine
        score:  "{{ steps.score.output.final_score }}"
name / version / trigger
Identity block. trigger is the event type that starts this workflow: email_received, alert_triggered, or file_detected. Used for routing in the engine — only affects which queue picks up the event.
inputs
Named fields the caller must supply at runtime. Reference them anywhere in step configs with {{ inputs.field_name }}. Optional fields can be listed without a required: true key — they'll be empty string if missing.
steps
Ordered list of actions. Each step has an id (used to reference its output), a type (the step handler), and a with block of handler-specific config. Steps run sequentially. On failure, the default is to halt — add on_failure: continue to keep going.
{{ template expressions }}
Double-brace interpolation is available in any string field. Reference inputs with {{ inputs.name }}, prior step outputs with {{ steps.step_id.output.field }}. Arrays use index notation: {{ steps.extract_urls.output.all[0] }}.
if: (conditional execution)
Add an if: field to any step to make it conditional. The expression is evaluated as JavaScript. Use it on output steps after a conditional branch to implement true/false routing paths.

Every supported step type

All five types are live in production across the workflow library. Mix and match in any order. The conditional type is the branching primitive — everything else is a leaf node.

transform Compute derived values from inputs or prior step outputs using JavaScript expressions. No network calls — pure in-process math, string ops, and logic.

REQUIRED

  • compute — map of key: "expression" pairs

OPTIONAL

  • on_failurecontinue or halt (default: halt)
EXAMPLE
id: score
type: transform
with:
  compute:
    attempt_count: "Number('{{ inputs.failed_count }}')"
    is_burst:      "attempt_count > 10 ? 'true' : 'false'"
    risk_score:    "Math.min(attempt_count * 8, 100)"
http.get / http.post Make outbound HTTP requests to any API — threat intel, enrichment, SIEMs, ticketing systems. Response body is parsed as JSON automatically and available in steps.id.output.body.

REQUIRED

  • url — full URL, may contain {{ }} refs

OPTIONAL

  • headers — key/value map
  • body — key/value map (POST only)
  • on_failurecontinue or halt
EXAMPLE — crt.sh (cert transparency, free, no API key)
id: crt_lookup
type: http.get
on_failure: continue
with:
  url: "https://crt.sh/?q=%25{{ inputs.brand_token }}%25&output=json&deduplicate=Y"
  headers:
    Accept: "application/json"
# Used in the Brand Impersonation & Typosquat Hunter workflow to pull all
# TLS certificates containing a brand name as a substring — surfaces typosquats
# and domain squatters before they launch phishing campaigns.
llm.analyze Send a prompt to GPT-4o-mini (or gpt-4o) and get structured JSON back. Use for classification, risk summarization, MITRE ATT&CK mapping, or any reasoning task that needs natural language understanding.

REQUIRED

  • prompt — the user message, supports {{ }}

OPTIONAL

  • modelgpt-4o-mini (default) or gpt-4o
  • system — system prompt
  • json_outputtrue forces JSON mode
  • temperature — 0–1, default 0.1
  • max_tokens — default 600
EXAMPLE
id: summarize_risk
type: llm.analyze
with:
  model:       gpt-4o-mini
  json_output: true
  system:      "You are a senior cloud security analyst."
  prompt: |
    Bucket: {{ inputs.bucket_name }}
    Permissions: {{ steps.extract_perms.output.acl }}
    Return JSON: { risk, rationale, mitre_technique }
conditional Evaluate a JavaScript expression against step outputs and set output.result to true or false. Downstream steps use if: fields to branch on this result.
⑂  This is the only step type that supports branching / conditional execution

REQUIRED

  • if — JavaScript expression returning truthy/falsy

OUTPUT

  • resulttrue or false
EXAMPLE
# Set the branch gate
id: route
type: conditional
with:
  if: "steps.score.output.risk_score >= 60"

# High-risk path
id: escalate
type: output
if: "steps.route.output.result == true"
with:
  value:
    action: page_on_call

# Low-risk path
id: log_only
type: output
if: "steps.route.output.result == false"
with:
  value:
    action: monitor
output Terminate the workflow and emit a structured result object. Typically placed at the end of each branch. The value map becomes the workflow's final output, available in the execution log and API response.

REQUIRED

  • value — key/value map of result fields, supports {{ }}

OPTIONAL

  • if — condition guard (skip if false)
EXAMPLE
id: result
type: output
with:
  value:
    verdict:   "{{ steps.summarize_risk.output.parsed.risk }}"
    rationale: "{{ steps.summarize_risk.output.parsed.rationale }}"
    action:    remediate
    score:     "{{ steps.score.output.risk_score }}"
regex.extract Run a regular expression against any string input and return the matched values as an array. Used for IOC extraction, log parsing, and pulling structured data out of unstructured text.

REQUIRED

  • input — string to search, supports {{ }}
  • pattern — regex pattern string

OPTIONAL

  • globaltrue for all matches (default: first only)
  • flags — e.g. "gi"
EXAMPLE
id: extract_ips
type: regex.extract
with:
  input:   "{{ inputs.log_line }}"
  pattern: "\\b(\\d{1,3}\\.){3}\\d{1,3}\\b"
  global:  true

Two custom workflows, ready to copy

Runnable YAML — not pseudocode. Drop either file into CipherRun and it executes as written. Both are fully commented so you can modify them for your environment.

EXAMPLE A

S3 Bucket Public Exposure Check

Fires when a new S3 bucket is detected or a policy change event arrives. Calls the AWS-compatible GetBucketAcl endpoint, extracts the permission grants, runs an LLM risk assessment, then branches on severity — high risk triggers a block-public-access remediation output; low risk logs and monitors.

parse_event fetch_bucket_acl extract_grantees assess_risk score route remediate / monitor
s3-public-exposure.yaml
name: S3 Bucket Public Exposure Check
version: "1.0"
trigger: alert_triggered
description: >
  Checks S3 bucket ACL for public grantees, assesses risk with
  GPT-4o-mini, and routes to block-public-access or monitor.

inputs:
  - name: bucket_name
    description: "S3 bucket name (e.g. my-company-assets)"
  - name: aws_region
    description: "AWS region (e.g. us-east-1)"
  - name: account_id
    description: "AWS account ID for context"
  - name: event_source
    description: "Who/what triggered this check (CloudTrail, scheduler)"

steps:

  # 1. Parse and normalise inputs
  - id: parse_event
    type: transform
    with:
      compute:
        bucket:  "'{{ inputs.bucket_name }}'"
        region:  "'{{ inputs.aws_region }}' || 'us-east-1'"
        account: "'{{ inputs.account_id }}'"

  # 2. Fetch bucket ACL from AWS-compatible endpoint
  #    Replace with your real AWS API gateway or signed-request proxy.
  - id: fetch_bucket_acl
    type: http.get
    on_failure: continue
    with:
      url: "https://s3.{{ inputs.aws_region }}.amazonaws.com/{{ inputs.bucket_name }}?acl"
      headers:
        Accept: "application/json"

  # 3. Extract public grantees from the ACL response
  #    AllUsers / AuthenticatedUsers URIs indicate public exposure.
  - id: extract_grantees
    type: transform
    with:
      compute:
        acl_raw:       "JSON.stringify('{{ steps.fetch_bucket_acl.output.body }}')"
        is_public:     "JSON.stringify('{{ steps.fetch_bucket_acl.output.body }}').includes('AllUsers') ? 'true' : 'false'"
        is_auth_users: "JSON.stringify('{{ steps.fetch_bucket_acl.output.body }}').includes('AuthenticatedUsers') ? 'true' : 'false'"
        acl_summary:   "JSON.stringify('{{ steps.fetch_bucket_acl.output.body }}').includes('AllUsers') ? 'PUBLIC: AllUsers read access detected' : 'No AllUsers grantee found'"

  # 4. LLM risk assessment — MITRE ATT&CK + remediation advice
  - id: assess_risk
    type: llm.analyze
    with:
      model:       gpt-4o-mini
      temperature: 0.1
      json_output: true
      system:      "You are a senior cloud security engineer specialising in AWS S3 misconfigurations."
      prompt: |
        Bucket name: {{ inputs.bucket_name }}
        Region: {{ inputs.aws_region }}
        Account: {{ inputs.account_id }}
        ACL summary: {{ steps.extract_grantees.output.acl_summary }}
        Is public (AllUsers): {{ steps.extract_grantees.output.is_public }}
        Is auth-users exposed: {{ steps.extract_grantees.output.is_auth_users }}

        Assess the exposure risk. Return JSON with these fields:
        { "risk": "critical|high|medium|low", "confidence": 0.0-1.0,
          "rationale": "...", "mitre_technique": "T####.###",
          "recommended_action": "...", "urgency": "immediate|24h|72h|monitor" }

  # 5. Compute numeric risk score
  - id: score
    type: transform
    with:
      compute:
        public_bonus:    "'{{ steps.extract_grantees.output.is_public }}' === 'true' ? 50 : 0"
        auth_bonus:      "'{{ steps.extract_grantees.output.is_auth_users }}' === 'true' ? 30 : 0"
        confidence_mul:  "Number('{{ steps.assess_risk.output.parsed.confidence }}') || 0.5"
        base_risk:       "(('{{ steps.extract_grantees.output.is_public }}' === 'true') ? 50 : 0) + (('{{ steps.extract_grantees.output.is_auth_users }}' === 'true') ? 30 : 0)"
        risk_score:      "Math.min(Math.round(((('{{ steps.extract_grantees.output.is_public }}' === 'true') ? 50 : 0) + (('{{ steps.extract_grantees.output.is_auth_users }}' === 'true') ? 30 : 0)) * (Number('{{ steps.assess_risk.output.parsed.confidence }}') || 0.8) + 10), 100)"

  # 6. Branch on risk score — remediate if >= 60
  - id: route
    type: conditional
    with:
      if: "steps.score.output.risk_score >= 60"

  # 7a. High-risk path: emit block-public-access remediation
  - id: remediate
    type: output
    if: "steps.route.output.result == true"
    with:
      value:
        verdict:              "{{ steps.assess_risk.output.parsed.risk }}"
        score:                "{{ steps.score.output.risk_score }}"
        recommended_action:   block_public_access
        mitre_technique:      "{{ steps.assess_risk.output.parsed.mitre_technique }}"
        rationale:            "{{ steps.assess_risk.output.parsed.rationale }}"
        urgency:              "{{ steps.assess_risk.output.parsed.urgency }}"
        is_public:            "{{ steps.extract_grantees.output.is_public }}"
        bucket:               "{{ inputs.bucket_name }}"

  # 7b. Low-risk path: log and monitor
  - id: monitor
    type: output
    if: "steps.route.output.result == false"
    with:
      value:
        verdict:            "{{ steps.assess_risk.output.parsed.risk }}"
        score:              "{{ steps.score.output.risk_score }}"
        recommended_action: monitor
        rationale:          "{{ steps.assess_risk.output.parsed.rationale }}"
        bucket:             "{{ inputs.bucket_name }}"

EXAMPLE B

Failed Login Burst → Slack Escalation

Triggers when an auth system reports a spike of failed logins for a single account. Counts attempts against a configurable time window, fetches user context, runs an LLM to classify the attack pattern, then — above a threshold — posts a formatted Slack alert to the security channel with a recommended response action.

parse_event count_attempts route_threshold fetch_user_context classify_attack post_slack result
failed-login-burst.yaml
name: Failed Login Burst to Slack Escalation
version: "1.0"
trigger: alert_triggered
description: >
  Counts failed logins in a rolling window, classifies the attack
  pattern with GPT-4o-mini, and escalates via Slack if count >= 10.

inputs:
  - name: user_id
    description: "Target account identifier (email or UUID)"
  - name: failed_count
    description: "Number of failed attempts in the window"
  - name: window_minutes
    description: "Rolling window size in minutes (e.g. 15)"
  - name: source_ips
    description: "Comma-separated list of source IPs seen in the burst"
  - name: user_lookup_url
    description: "Internal API endpoint to fetch user context"
  - name: slack_webhook_url
    description: "Slack incoming webhook URL for the security channel"

steps:

  # 1. Parse inputs and compute burst rate
  - id: parse_event
    type: transform
    with:
      compute:
        user:          "'{{ inputs.user_id }}'"
        count:         "Number('{{ inputs.failed_count }}') || 0"
        window:        "Number('{{ inputs.window_minutes }}') || 15"
        ip_list:       "'{{ inputs.source_ips }}'"
        unique_ips:    "new Set('{{ inputs.source_ips }}'.split(',').map(s=>s.trim())).size"

  # 2. Score the burst — attempts per minute + unique IP weighting
  - id: count_attempts
    type: transform
    with:
      compute:
        attempts_per_min: "Math.round((Number('{{ inputs.failed_count }}') || 0) / Math.max(Number('{{ inputs.window_minutes }}') || 15, 1) * 10) / 10"
        ip_spread_score:  "Math.min((new Set('{{ inputs.source_ips }}'.split(',').map(s=>s.trim())).size) * 5, 25)"
        volume_score:     "Math.min(Math.round((Number('{{ inputs.failed_count }}') || 0) * 4), 75)"
        burst_score:      "Math.min(Math.round((Number('{{ inputs.failed_count }}') || 0) * 4) + Math.min((new Set('{{ inputs.source_ips }}'.split(',').map(s=>s.trim())).size) * 5, 25), 100)"

  # 3. Route: only escalate if failed_count >= 10
  - id: route_threshold
    type: conditional
    with:
      if: "Number('{{ inputs.failed_count }}') >= 10"

  # 4. Fetch user context from your internal directory
  #    Skip gracefully if endpoint is unavailable.
  - id: fetch_user_context
    type: http.get
    on_failure: continue
    if: "steps.route_threshold.output.result == true"
    with:
      url: "{{ inputs.user_lookup_url }}/{{ inputs.user_id }}"
      headers:
        Accept: "application/json"

  # 5. LLM: classify the attack pattern
  - id: classify_attack
    type: llm.analyze
    if: "steps.route_threshold.output.result == true"
    with:
      model:       gpt-4o-mini
      temperature: 0.1
      json_output: true
      system:      "You are a SOC analyst specialising in account-takeover and credential-stuffing detection."
      prompt: |
        Account: {{ inputs.user_id }}
        Failed attempts: {{ inputs.failed_count }} in {{ inputs.window_minutes }} minutes
        Source IPs: {{ inputs.source_ips }}
        Burst score: {{ steps.count_attempts.output.burst_score }}/100

        Classify the attack pattern and return JSON:
        { "classification": "credential_stuffing|brute_force|password_spray|benign",
          "confidence": 0.0-1.0, "reasoning": "...",
          "recommended_action": "lock_account|force_mfa|monitor|no_action",
          "mitre_technique": "T####" }

  # 6. Post Slack alert — formatted security notification
  - id: post_slack
    type: http.post
    on_failure: continue
    if: "steps.route_threshold.output.result == true"
    with:
      url: "{{ inputs.slack_webhook_url }}"
      headers:
        Content-Type: "application/json"
      body:
        text: "*:rotating_light: Failed Login Burst Detected*"
        blocks:
          - type: section
            text:
              type: mrkdwn
              text: "*:rotating_light: Failed Login Burst — {{ inputs.failed_count }} attempts in {{ inputs.window_minutes }}m*\n*Account:* {{ inputs.user_id }}\n*Pattern:* {{ steps.classify_attack.output.parsed.classification }}\n*Action:* {{ steps.classify_attack.output.parsed.recommended_action }}\n*Score:* {{ steps.count_attempts.output.burst_score }}/100"

  # 7. Emit final result
  - id: result
    type: output
    if: "steps.route_threshold.output.result == true"
    with:
      value:
        verdict:            "{{ steps.classify_attack.output.parsed.classification }}"
        burst_score:        "{{ steps.count_attempts.output.burst_score }}"
        failed_count:       "{{ inputs.failed_count }}"
        recommended_action: "{{ steps.classify_attack.output.parsed.recommended_action }}"
        mitre_technique:    "{{ steps.classify_attack.output.parsed.mitre_technique }}"
        reasoning:          "{{ steps.classify_attack.output.parsed.reasoning }}"
        slack_notified:     true

  # Below-threshold path: log and exit quietly
  - id: below_threshold
    type: output
    if: "steps.route_threshold.output.result == false"
    with:
      value:
        verdict:            below_threshold
        failed_count:       "{{ inputs.failed_count }}"
        recommended_action: monitor
        slack_notified:     false

04 — GET STARTED

Want us to build a custom workflow for your environment?

Tell us your stack and the alert that's eating your team's time. We'll scope and ship a custom workflow — usually in under a week.

Got it — check your inbox.