diff --git a/canarytokens/canarydrop.py b/canarytokens/canarydrop.py index adefe1240..856558b84 100644 --- a/canarytokens/canarydrop.py +++ b/canarytokens/canarydrop.py @@ -72,6 +72,8 @@ class Canarydrop(BaseModel): memo: str = "" # Make created_at v2 compatible - add timestamp as alias. created_at: datetime = Field(default_factory=datetime.utcnow, alias="timestamp") + created_from_ip: Optional[str] + created_from_ip_x_forwarded_for: Optional[str] auth: str = Field(default_factory=make_auth_token) type: TokenTypes diff --git a/frontend/app.py b/frontend/app.py index e147f3a6b..a012dbfeb 100644 --- a/frontend/app.py +++ b/frontend/app.py @@ -445,12 +445,20 @@ async def generate(request: Request) -> AnyTokenResponse: # noqa: C901 # gen i else: kube_config = None canarytoken = Canarytoken() + + src_ip = request.headers.get(switchboard_settings.REAL_IP_HEADER) or ( + request.client.host if request.client else "" + ) + x_forwarded_for = request.headers.get("x-forwarded-for") or "" + canarydrop = Canarydrop( type=token_request_details.token_type, alert_email_enabled=True if token_request_details.email else False, alert_email_recipient=token_request_details.email, alert_webhook_enabled=True if token_request_details.webhook_url else False, alert_webhook_url=token_request_details.webhook_url or "", + created_from_ip=src_ip, + created_from_ip_x_forwarded_for=x_forwarded_for, canarytoken=canarytoken, memo=token_request_details.memo, browser_scanner_enabled=False, @@ -763,12 +771,20 @@ async def api_generate( # noqa: C901 # gen is large else: kube_config = None canarytoken = Canarytoken() + + src_ip = request.headers.get(switchboard_settings.REAL_IP_HEADER) or ( + request.client.host if request.client else "" + ) + x_forwarded_for = request.headers.get("x-forwarded-for") or "" + canarydrop = Canarydrop( type=token_request_details.token_type, alert_email_enabled=True if token_request_details.email else False, alert_email_recipient=token_request_details.email, alert_webhook_enabled=True if token_request_details.webhook_url else False, alert_webhook_url=token_request_details.webhook_url or "", + created_from_ip=src_ip, + created_from_ip_x_forwarded_for=x_forwarded_for, canarytoken=canarytoken, memo=token_request_details.memo, browser_scanner_enabled=False, diff --git a/tests/units/test_canarydrops.py b/tests/units/test_canarydrops.py index 9857013bb..f4113e736 100644 --- a/tests/units/test_canarydrops.py +++ b/tests/units/test_canarydrops.py @@ -34,12 +34,19 @@ def test_canarydrop(token_type): alert_webhook_url=None, canarytoken=canarytoken, memo="memo", + created_from_ip="127.0.100.1", + created_from_ip_x_forwarded_for="127.0.200.1", browser_scanner_enabled=False, redirect_url="https://youtube.com", ) save_canarydrop(cd) cd_retrieved = get_canarydrop(canarytoken) assert cd_retrieved.memo == cd.memo + assert cd_retrieved.created_from_ip == cd.created_from_ip + assert ( + cd_retrieved.created_from_ip_x_forwarded_for + == cd.created_from_ip_x_forwarded_for + ) assert cd_retrieved.canarytoken.value() == cd.canarytoken.value() if cd_retrieved.type in [TokenTypes.SLOW_REDIRECT, TokenTypes.FAST_REDIRECT]: assert cd_retrieved.redirect_url == "https://youtube.com" diff --git a/tests/units/test_frontend.py b/tests/units/test_frontend.py index 4c3b511fa..3125e4a71 100644 --- a/tests/units/test_frontend.py +++ b/tests/units/test_frontend.py @@ -66,6 +66,7 @@ from canarytokens.settings import FrontendSettings, SwitchboardSettings from canarytokens.tokens import Canarytoken from tests.utils import get_basic_hit, get_token_request +from frontend.app import ROOT_API_ENDPOINT def test_read_docs(test_client: TestClient) -> None: @@ -760,3 +761,57 @@ def test_block_user( json=token_request.json_safe_dict(), ) assert not resp.json()["error"] + + +@pytest.mark.parametrize( + "headers, expected_headers", + [ + pytest.param( + { + "x-real-ip": "127.0.300.1", + "x-forwarded-for": "127.0.400.1", + }, + { + "created_from_ip": "127.0.300.1", + "created_from_ip_x_forwarded_for": "127.0.400.1", + }, + id="ValidHeaders", + ), + pytest.param( + {}, + { + "created_from_ip": "", + "created_from_ip_x_forwarded_for": "", + }, + id="EmptyHeaders", + ), + ], +) +@pytest.mark.parametrize( + "token_request_type, token_response_type", + zip(set_of_request_classes, set_of_response_classes), +) +def test_generate_token_ip_headers( + token_request_type: AnyTokenRequest, + token_response_type: AnyTokenResponse, + test_client: TestClient, + setup_db: None, + headers: dict[str, str], + expected_headers: dict[str, str], +) -> None: + resp = test_client.post( + "/generate", data=get_token_request(token_request_type).json(), headers=headers + ) + token_resp = token_response_type(**resp.json()) + manage_resp = test_client.get( + f"{ROOT_API_ENDPOINT}/manage", + params=ManagePageRequest( + token=token_resp.token, + auth=token_resp.auth_token, + ).dict(), + follow_redirects=True, + ) + assert manage_resp.status_code == 200 + canarydrop = manage_resp.json()["canarydrop"] + for key, value in expected_headers.items(): + assert canarydrop[key] == value