Skip to content

Commit 14c4ae6

Browse files
authored
Merge pull request #1 from Envirotrust/feature/flood-etl-endpoint
Pull Request: Load Data from Local Parquet File & Expose API Endpoint
2 parents ef2cb49 + 43a797c commit 14c4ae6

File tree

19 files changed

+862
-0
lines changed

19 files changed

+862
-0
lines changed

.env

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,3 @@
1+
AWS_ACCESS_KEY_ID = 'AKIATDC35CBYPE4BW3U4'
2+
AWS_SECRET_ACCESS_KEY = '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'
3+
AWS_REGION = 'eu-west-1'

.gitignore

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,5 @@
1+
/data
2+
/playground
3+
*.venv
4+
.html
5+
*.pyc

Makefile

Lines changed: 23 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,23 @@
1+
SHELL := /bin/bash
2+
3+
venv:
4+
@echo "Creating virtual environment..."
5+
@python -m venv venv
6+
@echo "Virtual environment created successfully."
7+
8+
activate:
9+
@echo "Activating virtual environment..."
10+
@source venv/bin/activate
11+
@echo "Virtual environment activated. Use 'deactivate' to exit."
12+
13+
install:
14+
@echo "Installing dependencies..."
15+
@pip install -r requirements.txt
16+
@echo "Dependencies installed successfully."
17+
18+
format:
19+
@echo "Formatting code..."
20+
@black --line-length 79 --target-version py310 --skip-string-normalization --exclude venv,build,dist,docs .
21+
@isort --profile black --line-length 79 --skip venv,build,dist,docs .
22+
@pylint --max-line-length 79 --ignore=E501,W503 --exclude venv,build,dist,docs .
23+
@echo "Code formatted successfully."

api/__init__.py

Whitespace-only changes.

api/crud/__init__.py

Whitespace-only changes.

api/crud/flood.py

Lines changed: 124 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,124 @@
1+
"""
2+
Author: Ajeyomi Adedoyin Samuel
3+
4+
Title: Flood Risk Intersect Checker
5+
6+
Description:
7+
------------
8+
This script Loads flood risk polygon data(parquet file), loads it using duckdb,
9+
and check if the coordinate point(s) intersect with the flood-prone areas.
10+
11+
IT accepts Geoparquet with lon and lat of a location and applies spatial
12+
analysis to determine whether input geographic points fall within any of the designated flood zones.
13+
14+
Modules:
15+
--------
16+
- duckdb
17+
- typing
18+
- shapely
19+
20+
Functions:
21+
---------
22+
get_flood_information(bucket, key, longitude, latitude)
23+
Get a geographical coordinates as input and checks if they intersect with flood zones,
24+
and returns the results as JSON.
25+
26+
Usage:
27+
------
28+
- Ensure you put the correct path to your parqet file
29+
- Call `get_flood_information()` with either single or list of `longitude` and `latitude` values.
30+
31+
Note:
32+
-----
33+
- This script assumes that the geospatial data is readable by duckdb and contains a geometry column.
34+
"""
35+
36+
37+
38+
import sys
39+
import os
40+
sys.path.append(os.path.dirname(os.path.dirname(os.path.dirname(__file__))))
41+
42+
from .utils.duckdb import setup_spatial_extensions
43+
from .utils.utils import validate_coordinates
44+
import duckdb
45+
import json
46+
from shapely.geometry import Point
47+
from core.config import FLOOD_DATA
48+
49+
50+
51+
def get_flood_information(
52+
point_lon: float,
53+
point_lat: float,
54+
parquet_path:str=FLOOD_DATA,
55+
buffer_degrees: float = 0.00001 # ~0.1m buffer for pre-filtering
56+
) -> json:
57+
"""Check if a coordinate intersect with flood zones"""
58+
59+
# validate the input coordinates
60+
validate_coordinates(point_lon, point_lat)
61+
62+
# Setup spatial extensions
63+
setup_spatial_extensions()
64+
65+
# Create point and bounding box
66+
point_wkt = Point(point_lon, point_lat).wkt
67+
68+
# Bounding box for pre-filtering
69+
bbox_min_lon = point_lon - buffer_degrees
70+
bbox_max_lon = point_lon + buffer_degrees
71+
bbox_min_lat = point_lat - buffer_degrees
72+
bbox_max_lat = point_lat + buffer_degrees
73+
74+
print(f"Checking intersection for: {point_wkt}")
75+
print(f"Using bounding box filter: [{bbox_min_lon:.6f}, {bbox_min_lat:.6f}, {bbox_max_lon:.6f}, {bbox_max_lat:.6f}]")
76+
77+
try:
78+
print("Running optimized spatial query...")
79+
result = duckdb.sql(f"""
80+
SELECT 1
81+
FROM (
82+
SELECT geometry
83+
FROM read_parquet('{parquet_path}')
84+
)
85+
WHERE
86+
-- Fast bounding box pre-filter using geometry bounds
87+
ST_XMin(geometry) <= {bbox_max_lon}
88+
AND ST_XMax(geometry) >= {bbox_min_lon}
89+
AND ST_YMin(geometry) <= {bbox_max_lat}
90+
AND ST_YMax(geometry) >= {bbox_min_lat}
91+
-- Perform expensive intersection
92+
AND ST_Intersects(geometry, ST_GeomFromText('{point_wkt}'))
93+
LIMIT 1
94+
""").fetchone()
95+
96+
intersects = result is not None
97+
98+
result = {
99+
"longitude": point_lon,
100+
"latitude": point_lat,
101+
"flood_zone": intersects
102+
}
103+
104+
return result
105+
106+
except Exception as e:
107+
raise RuntimeError(f"{e.__class__.__name__}: {str(e)}")
108+
109+
110+
# if __name__ == "__main__":
111+
# start_time = time.time()
112+
113+
# point_lon=12.772144
114+
# point_lat=45.562546
115+
116+
117+
# result = get_flood_information(
118+
# point_lon=point_lon,
119+
# point_lat=point_lat,
120+
# parquet_path=FLOOD_DATA)
121+
122+
# elapsed = time.time() - start_time
123+
# print(f"Total time taken: {elapsed:.2f}s - Result: {result}")
124+

api/crud/utils/__init__.py

Whitespace-only changes.

api/crud/utils/duckdb.py

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,16 @@
1+
import duckdb
2+
3+
def setup_spatial_extensions():
4+
"""Install and load spatial extension with performance settings"""
5+
try:
6+
duckdb.sql("INSTALL spatial;")
7+
duckdb.sql("LOAD spatial;")
8+
9+
# Performance optimizations
10+
duckdb.sql("SET threads=2;") # Use multiple threads
11+
duckdb.sql("SET memory_limit='2GB';") # Increase memory limit
12+
duckdb.sql("SET max_memory='2GB';")
13+
14+
print("Spatial extension loaded with performance settings")
15+
except Exception as e:
16+
print(f"Note: Spatial extension setup: {e}")

api/crud/utils/utils.py

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,6 @@
1+
def validate_coordinates(longitude:float,
2+
latitude:float) -> tuple:
3+
"""Validate and normalize coordinate inputs"""
4+
if isinstance(longitude, (float)) and isinstance(latitude, (float)):
5+
return [longitude], [latitude]
6+
raise ValueError("Coordinates must be floats")

api/route/__init__.py

Whitespace-only changes.

api/route/flood.py

Lines changed: 30 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,30 @@
1+
from fastapi import APIRouter, HTTPException
2+
from api.crud.flood import get_flood_information
3+
from schema.flood import FloodDataResponse
4+
5+
import sys
6+
import os
7+
sys.path.append(os.path.dirname(__file__))
8+
9+
10+
flood_route = APIRouter()
11+
12+
@flood_route.get("/flood-risk",
13+
response_model=FloodDataResponse,
14+
status_code=200,
15+
tags=["Flood Risk"],
16+
description="Check if a particular location is flooded or not")
17+
async def get_flood_info(longitude: float, latitude: float) -> FloodDataResponse:
18+
"""Check if the latitudes and longitudes intersect with flood areas and return info as JSON"""
19+
try:
20+
data = get_flood_information(longitude, latitude)
21+
if not data:
22+
raise HTTPException(
23+
status_code=404,
24+
detail=f"No flood risk data found for location ({latitude}, {longitude})."
25+
)
26+
return data
27+
except ValueError as ve:
28+
raise HTTPException(status_code=400, detail=str(ve))
29+
except Exception:
30+
raise HTTPException(status_code=500, detail="Internal server error.")

core/__init__.py

Whitespace-only changes.

core/config.py

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,2 @@
1+
FLOOD_DATA = r'C:\Users\Admin\Desktop\ongoing_tasks\environtrust\data\flood.parquet'
2+

0 commit comments

Comments
 (0)