Skip to main content

Pull daily rankings into a data warehouse

This tutorial builds a small, repeatable pipeline: once a day, page through the keywords on a domain, pull each keyword's latest ranking, and load one row per keyword into a warehouse table (or a Google Sheet). The result is a time-series you own — ready for BI dashboards, alerting, or joining against your other marketing data.

The shape of the job is:

  1. List the keywords on a domain, paging until you have them all.
  2. Extract the latest ranking for each keyword from the response.
  3. Load one row per keyword into your warehouse, keyed so re-runs are idempotent.
  4. Schedule it to run daily.

:::tip New to the API? Skim the Quickstart and Authentication first. This tutorial assumes you have an API key and the UUID of a domain you track. It leans on the Pagination guide — read that if the header-driven loop below is unfamiliar. :::

What you'll need

  • An API key, created in your Ranktracker account. Send it in the Authorization header with no Bearer prefix.
  • The UUID of a domain you track. List your domains with GET /v1/domains (see the Domains reference) and copy the id of the one you want.
  • A destination — a warehouse table (BigQuery, Snowflake, Postgres, …) or a Google Sheet. The examples write CSV first so you can wire up any loader.

All examples use the production base URL https://api.ranktracker.com; every endpoint lives under /v1.

The source endpoint

Everything comes from one list endpoint:

GET /v1/domains/{domain_uuid}/keywords

It returns a JSON:API array under data. Each element is one tracked keyword, with its metadata (word, searchEngine, location, language, device) and its ranking data under attributes.results. See the full schema in the Keywords reference.

Two parts of results matter for this job:

  • results.summary — a snapshot of the keyword's search metrics (search_volume, traffic, cost_per_click, competition, difficulty).
  • results.history — an array of ranking checks, most recent first. The latest ranking is results.history[0]. Each entry carries the check date (created_at) and the positions, of which the ones you'll usually warehouse are organic_current_position, absolute_current_position, organic_url, and the day/week/month deltas.

A single keyword element looks like this (trimmed to the fields this tutorial uses):

{
"id": "b1e7c2a4-3f9d-4c8a-9e21-77aa0f6c1d33",
"type": "keyword",
"attributes": {
"word": "running shoes",
"searchEngine": { "uuid": "…", "name": "google" },
"device": { "uuid": "…", "name": "desktop" },
"location": { "uuid": "…", "code": "US", "name": "United States", "country_iso_code": "US", "variant": null },
"language": { "uuid": "…", "code": "en", "name": "English" },
"results": {
"summary": {
"search_volume": 40500,
"traffic": 812,
"cost_per_click": 0.94,
"competition": 0.7,
"difficulty": 63
},
"history": [
{
"uuid": "d4c3b2a1-…",
"created_at": "2026-07-03T04:12:07Z",
"organic_url": "https://www.example.com/running-shoes",
"organic_current_position": 4,
"organic_day_change": 1,
"organic_week_change": -2,
"organic_month_change": 3,
"absolute_current_position": 4
}
]
},
"updatedAt": "2026-07-03T04:12:09Z"
}
}

:::note The keyword id is your join key The top-level id is the keyword's UUID. It's stable across days, so it's the natural key for your warehouse rows. Pair it with the check date to get one row per keyword per day — the basis for the idempotent upsert below. :::

Step 1 — Page through the keywords

The keyword list is paginated. Pass page and per_page (max 1000), and read the total page count from the X-Total-Pages response header to know when to stop. For a full export, use a large per_page to keep the request count low.

A single page, with headers dumped:

curl -i "https://api.ranktracker.com/v1/domains/DOMAIN_UUID/keywords?per_page=1000&page=1" \
-H "Authorization: tkn_usr_your_api_key_here"
HTTP/1.1 200 OK
Content-Type: application/json
X-Total-Count: 1840
X-Total-Pages: 2
X-Per-Page: 1000
X-Current-Page: 1

You've fetched everything when X-Current-Page reaches X-Total-Pages. See Pagination for the full header reference and edge cases (an out-of-range page returns an empty data array, not an error).

Step 2 — Extract the latest ranking per keyword

For each keyword element, flatten the fields you care about into one row. The latest ranking is results.history[0]; guard against an empty history array (a brand-new keyword may not have a check yet).

Step 3 — Write rows (CSV, then load)

The script below does Steps 1–3: it pages through every keyword on a domain, flattens one row per keyword, and writes a CSV. Writing CSV first keeps the extract decoupled from the destination — you can hand the file to any loader, or swap the write_csv call for a warehouse insert (shown after).

import csv
import datetime as dt
import requests

BASE = "https://api.ranktracker.com/v1"
HEADERS = {"Authorization": "tkn_usr_your_api_key_here"}
PER_PAGE = 1000

# The snapshot date for this run. Use UTC so daily runs line up regardless of
# where the job executes.
RUN_DATE = dt.datetime.now(dt.timezone.utc).date().isoformat()


def fetch_keywords(domain_uuid):
"""Yield every tracked keyword on the domain, one page at a time."""
page = 1
while True:
resp = requests.get(
f"{BASE}/domains/{domain_uuid}/keywords",
headers=HEADERS,
params={"page": page, "per_page": PER_PAGE},
timeout=30,
)
resp.raise_for_status()
yield from resp.json()["data"]

total_pages = int(resp.headers["X-Total-Pages"])
if page >= total_pages:
break
page += 1


def to_row(keyword):
"""Flatten one keyword element into a warehouse row."""
attrs = keyword["attributes"]
history = attrs["results"]["history"]
latest = history[0] if history else {}
summary = attrs["results"]["summary"]

return {
# Natural key: keyword UUID + snapshot date -> one row per day.
"keyword_uuid": keyword["id"],
"snapshot_date": RUN_DATE,
# Dimensions.
"word": attrs["word"],
"search_engine": attrs["searchEngine"]["name"],
"device": attrs["device"]["name"],
"location": attrs["location"]["name"],
"language": attrs["language"]["name"],
# Latest ranking (from results.history[0]).
"checked_at": latest.get("created_at"),
"organic_position": latest.get("organic_current_position"),
"absolute_position": latest.get("absolute_current_position"),
"organic_url": latest.get("organic_url"),
"day_change": latest.get("organic_day_change"),
"week_change": latest.get("organic_week_change"),
"month_change": latest.get("organic_month_change"),
# Metrics snapshot.
"search_volume": summary.get("search_volume"),
"traffic": summary.get("traffic"),
}


FIELDNAMES = [
"keyword_uuid", "snapshot_date", "word", "search_engine", "device",
"location", "language", "checked_at", "organic_position",
"absolute_position", "organic_url", "day_change", "week_change",
"month_change", "search_volume", "traffic",
]


def write_csv(domain_uuid, path):
with open(path, "w", newline="") as f:
writer = csv.DictWriter(f, fieldnames=FIELDNAMES)
writer.writeheader()
count = 0
for keyword in fetch_keywords(domain_uuid):
writer.writerow(to_row(keyword))
count += 1
print(f"Wrote {count} rows to {path}")


if __name__ == "__main__":
write_csv("DOMAIN_UUID", f"rankings_{RUN_DATE}.csv")

Load into a warehouse

Once you have rows, loading is destination-specific. The important part is how you load, not which warehouse — see Step 4 for why an upsert (not an append) is what you want. Sketched for a BigQuery-style destination:

# Pseudocode — swap in your warehouse client.
rows = [to_row(kw) for kw in fetch_keywords("DOMAIN_UUID")]

# Idempotent load: MERGE on (keyword_uuid, snapshot_date) so re-running the
# same day overwrites rather than duplicates. See Step 4.
warehouse.merge(
table="rankings_daily",
rows=rows,
key=["keyword_uuid", "snapshot_date"],
)

If your warehouse client only exposes an append/insert, load into a staging table and MERGE from staging into the target on (keyword_uuid, snapshot_date) — the same idempotency guarantee, one indirection away.

Or write to a Google Sheet

For a lightweight destination, append rows to a sheet with the Sheets API. Key by keyword_uuid + snapshot_date the same way: read the existing rows for today's date, and clear them before writing so a re-run replaces rather than stacks.

Step 4 — Upsert idempotently

Scheduled jobs get retried, re-run after a fix, or triggered twice. If your load appends, any of those doubles your rows for the day and quietly corrupts every downstream average. Make the load idempotent so re-running a day is a no-op.

The rule: key each row on keyword_uuid + snapshot_date and MERGE / UPSERT on that key rather than INSERT.

  • keyword_uuid — the keyword's UUID (the top-level id), stable across days.
  • snapshot_date — the run's UTC date (not checked_at; see the note below). This is the grain of your table: one row per keyword per day.
-- Target table: one row per keyword per day.
CREATE TABLE IF NOT EXISTS rankings_daily (
keyword_uuid STRING NOT NULL,
snapshot_date DATE NOT NULL,
word STRING,
search_engine STRING,
device STRING,
location STRING,
language STRING,
checked_at TIMESTAMP,
organic_position INT64,
absolute_position INT64,
organic_url STRING,
day_change INT64,
week_change INT64,
month_change INT64,
search_volume INT64,
traffic INT64,
PRIMARY KEY (keyword_uuid, snapshot_date) NOT ENFORCED
);

-- Re-runnable load: MERGE from a staging table on the natural key.
MERGE rankings_daily AS target
USING staging_rankings AS source
ON target.keyword_uuid = source.keyword_uuid
AND target.snapshot_date = source.snapshot_date
WHEN MATCHED THEN UPDATE SET
organic_position = source.organic_position,
absolute_position = source.absolute_position,
organic_url = source.organic_url,
day_change = source.day_change,
week_change = source.week_change,
month_change = source.month_change,
checked_at = source.checked_at,
search_volume = source.search_volume,
traffic = source.traffic
WHEN NOT MATCHED THEN INSERT ROW;

Run the job twice on the same day and you get the same table state — exactly what you want from an idempotent pipeline.

:::note snapshot_date vs. checked_at Key on snapshot_date (your run's UTC date), not the API's checked_at. Rankings are recomputed on Ranktracker's own schedule, so checked_at can lag your run or be identical across two of your runs. Using your run date as the grain keeps exactly one row per keyword per day and makes re-runs deterministic. Keep checked_at as a column — it's useful to know when the ranking was actually measured — just don't key on it. :::

Step 5 — Schedule it daily

Rankings update roughly once a day, so a daily pull is the natural cadence. Run the script from whatever scheduler you already operate — cron, a cloud scheduler, or your orchestrator (Airflow, Dagster, …). A plain cron entry that runs at 06:00 UTC:

0 6 * * * /usr/bin/python3 /opt/jobs/pull_rankings.py >> /var/log/pull_rankings.log 2>&1

Because the load is idempotent (Step 4), it's safe to let the scheduler retry a failed run, or to backfill by re-running — neither duplicates data.

:::tip Pick a stable daily hour Run at the same UTC hour every day and derive snapshot_date in UTC (as the script does). That keeps your table on a clean one-row-per-day grain and avoids a run near local midnight landing on the "wrong" date. :::

Handle throttling and errors

A long paging loop makes many requests, so build in resilience:

  • 503 — throttled. Throttled requests currently return HTTP 503 (there are no RateLimit-* headers yet). Retry with exponential backoff. A large per_page helps by reducing how many requests you make.
  • 403 — API access disabled or forbidden. Confirm API access is enabled on your plan (apiEnabled in GET /v1/account/usage) and that the domain belongs to your account.
  • 404 — domain not found. A wrong UUID, or a domain in another account, returns 404. Double-check the domain_uuid.

Errors use the JSON:API error envelope:

{
"errors": [
{ "code": "throttled", "status": 503, "detail": "Too many requests." }
]
}

A minimal retry wrapper for the fetch, backing off on 503:

import time
import requests

def get_with_retry(url, *, headers, params, max_attempts=5):
for attempt in range(max_attempts):
resp = requests.get(url, headers=headers, params=params, timeout=30)
if resp.status_code == 503:
time.sleep(2 ** attempt) # 1s, 2s, 4s, 8s, 16s
continue
resp.raise_for_status()
return resp
raise RuntimeError(f"Gave up after {max_attempts} attempts: {url}")

See Errors & rate limits for the full status table and retry guidance.

Notes on the data

  • Empty history. A keyword that hasn't been checked yet has an empty results.history array. The to_row helper handles this by leaving the position fields empty — those rows fill in once the first check lands.
  • Positions can be null. If the domain doesn't rank for a keyword, position fields may be absent. Store them as nullable columns.
  • Paging during writes. If keywords are added or removed mid-export, a record can shift between pages. For an exact snapshot, page quickly and de-duplicate by keyword_uuid after collecting all pages — see the edge cases in Pagination.
  • Quota. Reading rankings doesn't consume tracking quota, but the keywords you export do count against your plan. Check limits with Usage & quota.
  • Pagination — the page/per_page params and the X-Total-* headers this job loops on.
  • Errors & rate limits — the full error envelope and the 503 retry-with-backoff strategy.
  • Bulk operations — batching writes when you're managing what you track, not just reading it.
  • Core concepts — the JSON:API envelope and resource model.
  • Keywords reference — the full keyword list schema, including every results.history field.