Skip to content

Commit 3660452

Browse files
Project finished
1 parent 12752d6 commit 3660452

18 files changed

+100326
-0
lines changed

README.md

Lines changed: 76 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,76 @@
1+
# Data Engineering Project with HDFS and Kafka
2+
3+
![](/images/map.png)
4+
5+
A project to create a data pipeline with data taken from Hepsiburada data engineering case study.
6+
7+
* [docker-compose.yml](/docker-compose.yml)
8+
9+
10+
* [config-hadoop](/config-hadoop)
11+
12+
13+
* [Producer](/docker/producer/)
14+
15+
* [Dockerfile](/docker/producer/Dockerfile)
16+
17+
* [HB data](/docker/producer/hb-data.json)
18+
19+
* [Kafka producer](/docker/producer/kafka_producer.py)
20+
21+
* [requirements](/docker/producer/requirements.txt)
22+
23+
24+
* [Consumer](/docker/consumer/)
25+
26+
* [Dockerfile](/docker/consumer/Dockerfile)
27+
28+
* [Kafka consumer](/docker/consumer/kafka_consumer.py)
29+
30+
* [requirements](/docker/consumer/requirements.txt)
31+
32+
* [HDFS](/docker/consumer/hdfs.py)
33+
34+
### Steps
35+
36+
Open an Ubuntu machine via AWS EC2 for the project.
37+
38+
![](/images/instance.png)
39+
40+
Open the necessary ports on the machine through the firewall.
41+
42+
![](/images/ingress.png)
43+
44+
You also need to open the necessary ports with the operating system.
45+
46+
```bash
47+
sudo ufw allow 9870
48+
sudo ufw allow 8080
49+
sudo ufw allow 8088
50+
```
51+
52+
Then, stand up the docker images.
53+
54+
```bash
55+
docker-compose up --build
56+
```
57+
58+
One minute after the images stand up, data begins to be written to the Kafka topic and activity begins in the data pipeline.
59+
60+
61+
Data from Kafka topic. IP:8080 or [0.0.0.0:8080](http://0.0.0.0:8080)
62+
![](/images/kafka_ui.png)
63+
64+
65+
Hadoop HDFS interface. IP:9870 or [0.0.0.0:9870](http://0.0.0.0:9870)
66+
![](/images/hdfs_datanode.png)
67+
68+
69+
Data from HDFS. IP:9870 or [0.0.0.0:9870](http://0.0.0.0:9870)
70+
![](/images/hdfs_data.png)
71+
72+
73+
Hadoop cluster interface. IP:8088 or [0.0.0.0:8088](http://0.0.0.0:8088)
74+
![](/images/hadoop.png)
75+
76+
[Ahmet Furkan Demir](https://ahmetfurkandemir.com/)

config-hadoop

Lines changed: 29 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,29 @@
1+
CORE-SITE.XML_fs.default.name=hdfs://namenode
2+
CORE-SITE.XML_fs.defaultFS=hdfs://namenode
3+
HDFS-SITE.XML_dfs.namenode.rpc-address=namenode:8020
4+
HDFS-SITE.XML_dfs.webhdfs.enabled=true
5+
HDFS-SITE.XML_dfs.permissions=false
6+
HDFS-SITE.XML_dfs.replication=1
7+
MAPRED-SITE.XML_mapreduce.framework.name=yarn
8+
MAPRED-SITE.XML_yarn.app.mapreduce.am.env=HADOOP_MAPRED_HOME=$HADOOP_HOME
9+
MAPRED-SITE.XML_mapreduce.map.env=HADOOP_MAPRED_HOME=$HADOOP_HOME
10+
MAPRED-SITE.XML_mapreduce.reduce.env=HADOOP_MAPRED_HOME=$HADOOP_HOME
11+
YARN-SITE.XML_yarn.resourcemanager.hostname=resourcemanager
12+
YARN-SITE.XML_yarn.nodemanager.pmem-check-enabled=false
13+
YARN-SITE.XML_yarn.nodemanager.delete.debug-delay-sec=600
14+
YARN-SITE.XML_yarn.nodemanager.vmem-check-enabled=false
15+
YARN-SITE.XML_yarn.nodemanager.aux-services=mapreduce_shuffle
16+
CAPACITY-SCHEDULER.XML_yarn.scheduler.capacity.maximum-applications=10000
17+
CAPACITY-SCHEDULER.XML_yarn.scheduler.capacity.maximum-am-resource-percent=0.1
18+
CAPACITY-SCHEDULER.XML_yarn.scheduler.capacity.resource-calculator=org.apache.hadoop.yarn.util.resource.DefaultResourceCalculator
19+
CAPACITY-SCHEDULER.XML_yarn.scheduler.capacity.root.queues=default
20+
CAPACITY-SCHEDULER.XML_yarn.scheduler.capacity.root.default.capacity=100
21+
CAPACITY-SCHEDULER.XML_yarn.scheduler.capacity.root.default.user-limit-factor=1
22+
CAPACITY-SCHEDULER.XML_yarn.scheduler.capacity.root.default.maximum-capacity=100
23+
CAPACITY-SCHEDULER.XML_yarn.scheduler.capacity.root.default.state=RUNNING
24+
CAPACITY-SCHEDULER.XML_yarn.scheduler.capacity.root.default.acl_submit_applications=*
25+
CAPACITY-SCHEDULER.XML_yarn.scheduler.capacity.root.default.acl_administer_queue=*
26+
CAPACITY-SCHEDULER.XML_yarn.scheduler.capacity.node-locality-delay=40
27+
CAPACITY-SCHEDULER.XML_yarn.scheduler.capacity.queue-mappings=
28+
CAPACITY-SCHEDULER.XML_yarn.scheduler.capacity.queue-mappings-override.enable=false
29+

docker-compose.yml

Lines changed: 135 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,135 @@
1+
---
2+
version: '2'
3+
services:
4+
namenode:
5+
image: apache/hadoop:3
6+
hostname: namenode
7+
command: ["hdfs", "namenode"]
8+
depends_on:
9+
- resourcemanager
10+
- nodemanager
11+
ports:
12+
- 9870:9870
13+
env_file:
14+
- ./config-hadoop
15+
environment:
16+
ENSURE_NAMENODE_DIR: "/tmp/hadoop-root/dfs/name"
17+
networks:
18+
- kafka-network
19+
volumes:
20+
- hadoop:/home/
21+
restart: on-failure
22+
23+
datanode:
24+
image: apache/hadoop:3
25+
command: ["hdfs", "datanode"]
26+
depends_on:
27+
- namenode
28+
env_file:
29+
- ./config-hadoop
30+
networks:
31+
- kafka-network
32+
volumes:
33+
- hadoop:/home/
34+
restart: on-failure
35+
36+
resourcemanager:
37+
image: apache/hadoop:3
38+
hostname: resourcemanager
39+
command: ["yarn", "resourcemanager"]
40+
ports:
41+
- 8088:8088
42+
env_file:
43+
- ./config-hadoop
44+
volumes:
45+
- ./test.sh:/opt/test.sh
46+
networks:
47+
- kafka-network
48+
restart: on-failure
49+
50+
nodemanager:
51+
image: apache/hadoop:3
52+
hostname: nodemanager
53+
command: ["yarn", "nodemanager"]
54+
env_file:
55+
- ./config-hadoop
56+
networks:
57+
- kafka-network
58+
volumes:
59+
- hadoop:/home/
60+
restart: on-failure
61+
62+
zookeeper:
63+
container_name: zookeeper
64+
image: confluentinc/cp-zookeeper:5.0.0
65+
environment:
66+
ZOOKEEPER_CLIENT_PORT: 2181
67+
networks:
68+
- kafka-network
69+
volumes:
70+
- hadoop:/home/
71+
restart: on-failure
72+
73+
74+
kafka-ui:
75+
image: provectuslabs/kafka-ui:latest
76+
ports:
77+
- 8080:8080
78+
environment:
79+
KAFKA_CLUSTERS_0_NAME: kafka
80+
KAFKA_CLUSTERS_0_BOOTSTRAPSERVERS: kafka:29092
81+
DYNAMIC_CONFIG_ENABLED: 'true'
82+
networks:
83+
- kafka-network
84+
85+
kafka:
86+
hostname: kafka
87+
container_name: kafka
88+
image: confluentinc/cp-kafka:5.0.0
89+
depends_on:
90+
- zookeeper
91+
networks:
92+
- kafka-network
93+
ports:
94+
- "9092:9092"
95+
- "29092:29092"
96+
environment:
97+
KAFKA_BROKER_ID: 1
98+
KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
99+
KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://kafka:29092,PLAINTEXT_HOST://kafka:9092
100+
KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT
101+
KAFKA_INTER_BROKER_LISTENER_NAME: PLAINTEXT
102+
KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
103+
KAFKA_AUTO_CREATE_TOPICS_ENABLE: "true"
104+
volumes:
105+
- hadoop:/home/
106+
restart: on-failure
107+
108+
producer:
109+
container_name: producer
110+
build: ./docker/producer
111+
volumes:
112+
- .:/code
113+
depends_on:
114+
- kafka
115+
networks:
116+
- kafka-network
117+
restart: on-failure
118+
119+
consumer:
120+
container_name: consumer
121+
build: ./docker/consumer
122+
volumes:
123+
- .:/code
124+
depends_on:
125+
- kafka
126+
networks:
127+
- kafka-network
128+
restart: on-failure
129+
130+
networks:
131+
kafka-network:
132+
external: true
133+
134+
volumes:
135+
hadoop:

docker/consumer/Dockerfile

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,8 @@
1+
FROM python:3.7
2+
WORKDIR /code
3+
4+
RUN python3 -m pip install --upgrade pip
5+
COPY requirements.txt requirements.txt
6+
RUN pip3 install -r requirements.txt
7+
COPY . .
8+
CMD [ "python3", "docker/consumer/kafka_consumer.py" ]

docker/consumer/hdfs.py

Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,17 @@
1+
import pyhdfs
2+
import uuid
3+
4+
hdfs = pyhdfs.HdfsClient(hosts="namenode:9870", user_name="hdfs")
5+
6+
userhomedir = hdfs.get_home_directory()
7+
print(userhomedir)
8+
availablenode = hdfs.get_active_namenode()
9+
print(availablenode)
10+
print(hdfs.listdir("/"))
11+
12+
hdfs.mkdirs('/data')
13+
print(hdfs.list_status('/data'))
14+
15+
def write_to_hdfs(json_str):
16+
17+
hdfs.create("/data/{}.json".format(str(uuid.uuid1())), json_str)

docker/consumer/kafka_consumer.py

Lines changed: 28 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,28 @@
1+
from kafka import KafkaConsumer
2+
from json import loads
3+
import time
4+
5+
time.sleep(50)
6+
7+
import hdfs
8+
9+
try:
10+
11+
consumer = KafkaConsumer(
12+
'hb',
13+
bootstrap_servers=['kafka:29092'],
14+
auto_offset_reset='earliest',
15+
enable_auto_commit=True,
16+
group_id='my-group',
17+
value_deserializer=lambda x: loads(x.decode('utf-8')))
18+
19+
except:
20+
raise Exception('kafka connect error')
21+
22+
23+
for message in consumer:
24+
message = message.value
25+
hdfs.write_to_hdfs(str(message))
26+
print(message)
27+
28+

docker/consumer/requirements.txt

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,2 @@
1+
kafka-python
2+
PyHDFS

docker/producer/Dockerfile

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,8 @@
1+
FROM python:3.7
2+
WORKDIR /code
3+
4+
RUN python3 -m pip install --upgrade pip
5+
COPY requirements.txt requirements.txt
6+
RUN pip3 install -r requirements.txt
7+
COPY . .
8+
CMD [ "python3", "docker/producer/kafka_producer.py" ]

0 commit comments

Comments
 (0)