Skip to content

Commit

Permalink
Add src ip to canarydrop on token create (#597)
Browse files Browse the repository at this point in the history
* Expand the app and the canarydrop to save the IP

* Add test for the created_from_ip and created_from_ip_x_forwarded_for

* Update reading src from host and add testing for frontend
  • Loading branch information
thinkst-pieter authored Oct 31, 2024
1 parent ef84f1b commit bd0474c
Show file tree
Hide file tree
Showing 4 changed files with 80 additions and 0 deletions.
2 changes: 2 additions & 0 deletions canarytokens/canarydrop.py
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,8 @@ class Canarydrop(BaseModel):
memo: str = ""
# Make created_at v2 compatible - add timestamp as alias.
created_at: datetime = Field(default_factory=datetime.utcnow, alias="timestamp")
created_from_ip: Optional[str]
created_from_ip_x_forwarded_for: Optional[str]

auth: str = Field(default_factory=make_auth_token)
type: TokenTypes
Expand Down
16 changes: 16 additions & 0 deletions frontend/app.py
Original file line number Diff line number Diff line change
Expand Up @@ -445,12 +445,20 @@ async def generate(request: Request) -> AnyTokenResponse: # noqa: C901 # gen i
else:
kube_config = None
canarytoken = Canarytoken()

src_ip = request.headers.get(switchboard_settings.REAL_IP_HEADER) or (
request.client.host if request.client else ""
)
x_forwarded_for = request.headers.get("x-forwarded-for") or ""

canarydrop = Canarydrop(
type=token_request_details.token_type,
alert_email_enabled=True if token_request_details.email else False,
alert_email_recipient=token_request_details.email,
alert_webhook_enabled=True if token_request_details.webhook_url else False,
alert_webhook_url=token_request_details.webhook_url or "",
created_from_ip=src_ip,
created_from_ip_x_forwarded_for=x_forwarded_for,
canarytoken=canarytoken,
memo=token_request_details.memo,
browser_scanner_enabled=False,
Expand Down Expand Up @@ -763,12 +771,20 @@ async def api_generate( # noqa: C901 # gen is large
else:
kube_config = None
canarytoken = Canarytoken()

src_ip = request.headers.get(switchboard_settings.REAL_IP_HEADER) or (
request.client.host if request.client else ""
)
x_forwarded_for = request.headers.get("x-forwarded-for") or ""

canarydrop = Canarydrop(
type=token_request_details.token_type,
alert_email_enabled=True if token_request_details.email else False,
alert_email_recipient=token_request_details.email,
alert_webhook_enabled=True if token_request_details.webhook_url else False,
alert_webhook_url=token_request_details.webhook_url or "",
created_from_ip=src_ip,
created_from_ip_x_forwarded_for=x_forwarded_for,
canarytoken=canarytoken,
memo=token_request_details.memo,
browser_scanner_enabled=False,
Expand Down
7 changes: 7 additions & 0 deletions tests/units/test_canarydrops.py
Original file line number Diff line number Diff line change
Expand Up @@ -34,12 +34,19 @@ def test_canarydrop(token_type):
alert_webhook_url=None,
canarytoken=canarytoken,
memo="memo",
created_from_ip="127.0.100.1",
created_from_ip_x_forwarded_for="127.0.200.1",
browser_scanner_enabled=False,
redirect_url="https://youtube.com",
)
save_canarydrop(cd)
cd_retrieved = get_canarydrop(canarytoken)
assert cd_retrieved.memo == cd.memo
assert cd_retrieved.created_from_ip == cd.created_from_ip
assert (
cd_retrieved.created_from_ip_x_forwarded_for
== cd.created_from_ip_x_forwarded_for
)
assert cd_retrieved.canarytoken.value() == cd.canarytoken.value()
if cd_retrieved.type in [TokenTypes.SLOW_REDIRECT, TokenTypes.FAST_REDIRECT]:
assert cd_retrieved.redirect_url == "https://youtube.com"
Expand Down
55 changes: 55 additions & 0 deletions tests/units/test_frontend.py
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,7 @@
from canarytokens.settings import FrontendSettings, SwitchboardSettings
from canarytokens.tokens import Canarytoken
from tests.utils import get_basic_hit, get_token_request
from frontend.app import ROOT_API_ENDPOINT


def test_read_docs(test_client: TestClient) -> None:
Expand Down Expand Up @@ -760,3 +761,57 @@ def test_block_user(
json=token_request.json_safe_dict(),
)
assert not resp.json()["error"]


@pytest.mark.parametrize(
"headers, expected_headers",
[
pytest.param(
{
"x-real-ip": "127.0.300.1",
"x-forwarded-for": "127.0.400.1",
},
{
"created_from_ip": "127.0.300.1",
"created_from_ip_x_forwarded_for": "127.0.400.1",
},
id="ValidHeaders",
),
pytest.param(
{},
{
"created_from_ip": "",
"created_from_ip_x_forwarded_for": "",
},
id="EmptyHeaders",
),
],
)
@pytest.mark.parametrize(
"token_request_type, token_response_type",
zip(set_of_request_classes, set_of_response_classes),
)
def test_generate_token_ip_headers(
token_request_type: AnyTokenRequest,
token_response_type: AnyTokenResponse,
test_client: TestClient,
setup_db: None,
headers: dict[str, str],
expected_headers: dict[str, str],
) -> None:
resp = test_client.post(
"/generate", data=get_token_request(token_request_type).json(), headers=headers
)
token_resp = token_response_type(**resp.json())
manage_resp = test_client.get(
f"{ROOT_API_ENDPOINT}/manage",
params=ManagePageRequest(
token=token_resp.token,
auth=token_resp.auth_token,
).dict(),
follow_redirects=True,
)
assert manage_resp.status_code == 200
canarydrop = manage_resp.json()["canarydrop"]
for key, value in expected_headers.items():
assert canarydrop[key] == value

0 comments on commit bd0474c

Please sign in to comment.