Commit 29d45847 authored by abrahaja's avatar abrahaja
Browse files

MapReduce Server

parents
#!/bin/bash
#
# mapreduce
#
# Start server,
# Stop on errors, print commands
# See https://vaneyckt.io/posts/safer_bash_scripts_with_set_euxo_pipefail/
set -Eeuo pipefail
set -x
usage() {
echo "Usage: $0 (start|stop|status|restart)"
}
start() {
# Start server
if pgrep -f mapreduce-master || pgrep -f mapreduce-worker ; then
echo "Error: mapreduce-master is already running"
exit 1
else
echo '{"message_type": "register"}'
mapreduce-master 6000 &
sleep 2
mapreduce-worker 6000 6001 &
mapreduce-worker 6000 6002 &
fi
}
stop() {
# Stop server
# Detect GNU vs BSD netcat. We need netcat to close the connection after
# sending a message, which requires different options.
set +o pipefail # Avoid erroneous failures due to grep returning non-zero
if nc -h 2>&1 | grep -q "\-c"; then
NC="nc -c"
elif nc -h 2>&1 | grep -q "\-N"; then
NC="nc -N"
elif nc -h 2>&1 | grep -q "\-C"; then
NC="nc -C"
else
echo "Error detecting netcat version."
exit 1
fi
set -o pipefail
echo '{"message_type": "shutdown"}' | $NC localhost 6000 || true
sleep 2 # give the master time to receive signal and send to workers
echo "killing mapreduce master ..."
pkill -f mapreduce-master || true
echo "killing mapreduce worker ..."
pkill -f mapreduce-worker || true
}
if [ $# -ne 1 ]; then
usage
exit 1
fi
case $1 in
"start")
start
;;
"stop")
stop
;;
"status")
if pgrep -f mapreduce-master && pgrep -f mapreduce-worker ; then
echo "master running"
echo "workers running"
else
echo "master not running"
echo "workers not running"
fi
;;
"restart")
stop
start
;;
*)
usage
exit 1
;;
esac
"""Example TCP socket client."""
import socket
import json
def main():
"""Test TCP Socket Client."""
# create an INET, STREAMing socket, this is TCP
sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
# connect to the server
sock.connect(("localhost", 8000))
# send a message
message = json.dumps({"hello": "world"})
sock.sendall(message.encode('utf-8'))
sock.close()
if __name__ == "__main__":
main()
\ No newline at end of file
"""Example TCP socket server."""
import socket
import json
def main():
"""Test TCP Socket Server."""
# Create an INET, STREAMing socket, this is TCP
sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
# Bind the socket to the server
sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
sock.bind(("localhost", 8000))
sock.listen()
# Connect to a client
clientsocket, address = sock.accept()
print("Connection from", address[0])
# Receive data, one chunk at a time. When the client closes the
# connection, recv() returns empty data, which breaks out of the loop. We
# make a simplifying assumption that the client will always cleanly close
# the connection.
message_chunks = []
while True:
data = clientsocket.recv(4096)
if not data:
break
message_chunks.append(data)
clientsocket.close()
# Decode UTF8 and parse JSON data
message_bytes = b''.join(message_chunks)
message_str = message_bytes.decode("utf-8")
message_dict = json.loads(message_str)
print(message_dict)
if __name__ == "__main__":
main()
\ No newline at end of file
"""Example threading."""
import os
import threading
def main():
"""Test Threading."""
threads = []
for i in range(5):
thread = threading.Thread(target=worker, args=(i,))
threads.append(thread)
thread.start()
def worker(worker_id):
"""Worker thread."""
pid = os.getpid()
print("Worker {} pid={}".format(worker_id, pid))
while True:
# spin
pass
if __name__ == "__main__":
main()
\ No newline at end of file
"""
Mapreduce python package.
"""
from mapreduce.master import Master
from mapreduce.worker import Worker
"""
Master python package emulating a master node for distributed mapreduce.
"""
from mapreduce.master.__main__ import Master
"""Mapreduce server."""
import os
import logging
import json
import time
import pathlib
import glob
import shutil
import threading
import queue
import heapq
import click
import mapreduce.utils as utils
# import socket
# Configure logging
logging.basicConfig(level=logging.DEBUG)
def shutdown_worker(worker_port):
"""Shutdown worker."""
message = {"message_type": "shutdown"}
utils.client_send_data(worker_port, message)
def job_generator(message_dict, tmp_dir):
"""Fault manager."""
job_dict = {
"tasks": queue.SimpleQueue(),
"input_directory": message_dict["input_directory"],
"output_directory": message_dict["output_directory"],
"mapper_executable": message_dict["mapper_executable"],
"reducer_executable": message_dict["reducer_executable"],
"num_mappers": message_dict["num_mappers"],
"num_reducers": message_dict["num_reducers"],
"job_id": message_dict["job_id"],
"tmp_output_dir": tmp_dir,
}
return job_dict
def worker_tasks_empty(worker):
"""Return bool worker tasks empty."""
if worker["status"] != "dead":
if worker["task_files"] != []:
return False
if worker["sort_list"] != []:
return False
return True
class Master:
"""Master class."""
shutdown = False
stage = "idle"
own_port = 0
workers = []
num_ready_workers = 0
group_intermediate = False
current_job = {}
job_counter = 0
job_queue = queue.SimpleQueue()
def heart_beat_listen(self, port_num):
"""Listen for hb."""
sock = utils.udp_server_create(port_num)
sock.settimeout(1)
message_dict = None
while not self.shutdown:
message_dict = utils.udp_receive(sock)
if message_dict == -1:
continue
if message_dict["message_type"] != "heartbeat":
continue
# print("message_dict: ", message_dict)
for worker in self.workers:
if worker["pid"] == message_dict["worker_pid"]:
# print("worker exists")
worker["num_missed_heartbeats"] = 0
worker["time_since_last_hb"] = time.time()
# print("ended for loop")
time.sleep(0.1)
# print("finished sleeping")
sock.close()
def kill_worker(self, worker_index):
"""Kill worker."""
worker = self.workers[worker_index]
if worker["status"] == "busy":
self.current_job["tasks"].put_nowait(worker["task_files"])
assignee_idx = 0
for worker_ in self.workers:
if worker_["status"] == "ready":
self.assign_another_task(assignee_idx)
break
assignee_idx += 1
elif worker["status"] == "ready":
self.num_ready_workers -= 1
self.workers[worker_index]["status"] = "dead"
print("worker {} died".format(worker_index))
def fault_manager(self, worker_index):
"""Fault manager."""
worker = self.workers[worker_index]
while worker["status"] != "dead" and not self.shutdown:
# print("fault manager running")
elapsed_time = time.time() - worker["time_since_last_hb"]
if elapsed_time >= 2:
worker["time_since_last_hb"] = time.time()
worker["num_missed_heartbeats"] += 1
if worker["num_missed_heartbeats"] == 5:
self.kill_worker(worker_index)
else:
time.sleep(0.1)
print("Fault manager ending \n\n\n")
def register_worker(self, message):
"""Register worker."""
worker_index = len(self.workers)
print("registering worker")
self.workers.append({"pid": message["worker_pid"],
"port": message["worker_port"],
"status": "ready",
"sort_list": [],
"task_files": [],
"num_missed_heartbeats": 0,
"time_since_last_hb": time.time()
})
logging.info("Master:%s registered worker localhost:%s PID %s",
self.own_port, message["worker_port"],
message["worker_pid"])
message["message_type"] = "register_ack"
worker_port = message["worker_port"]
self.num_ready_workers += 1
utils.client_send_data(worker_port, message)
if self.stage != "idle":
self.assign_another_task(worker_index)
print("starting fault manager")
ft_thread = threading.Thread(target=self.fault_manager,
args=(worker_index,))
ft_thread.start()
def assign_tasks(self, num_workers, executable, output_dir):
"""Fault manager."""
num_workers1 = min(num_workers, self.num_ready_workers)
i = 0
for worker in self.workers:
if i < num_workers1 and worker["status"] == "ready":
job_tasks = self.current_job["tasks"].get_nowait()
worker["task_files"] = job_tasks
message = {
"message_type": "new_worker_task",
"input_files": job_tasks,
"executable": executable,
"output_directory": output_dir,
"worker_pid": worker["pid"]
}
utils.client_send_data(worker["port"], message)
worker["status"] = "busy"
self.num_ready_workers -= 1
i += 1
elif i == num_workers1:
return
return
def partion_inputs(self, num_workers, files_list):
"""Fault manager."""
num_tasks = num_workers
tasks = self.current_job["tasks"]
i = 0
while i < num_tasks:
tasks.put([])
i += 1
# input_dir = pathlib.Path(files_dir)
# job_files = glob.glob(str(input_dir/"*"))
job_files = sorted(files_list)
self.current_job["job_files"] = job_files
for job_file in job_files:
# job_file = pathlib.PurePath(job_file)
task = tasks.get()
task.append(job_file)
tasks.put(task)
self.current_job["tasks"] = tasks
def start_map_stage(self):
"""Fault manager."""
logging.info("Master:%s begin map stage", self.own_port)
self.stage = "map"
self.current_job = self.job_queue.get()
num_tasks = self.current_job["num_mappers"]
directory = pathlib.Path(self.current_job["input_directory"])
job_files = glob.glob(str(directory/"*"))
self.partion_inputs(num_tasks, job_files)
executable = self.current_job["mapper_executable"]
id_job = self.current_job["job_id"]
output_dir = "tmp/job-{}/mapper-output".format(id_job)
self.assign_tasks(num_tasks, executable, output_dir)
def handle_new_job(self, message_dict):
"""Fault manager."""
logging.info(
"Master:%s new job number %s",
self.own_port, self.job_counter
)
new_job_dir = pathlib.Path("tmp/job-{}".format(self.job_counter))
# self.current_job["tmp_output_dir"] = new_job_dir
message_dict["job_id"] = self.job_counter
self.job_counter += 1
new_job_dir.mkdir()
mapper = new_job_dir/"mapper-output"
grouper = new_job_dir/"grouper-output"
reducer = new_job_dir/"reducer-output"
mapper.mkdir()
grouper.mkdir()
reducer.mkdir()
job = job_generator(message_dict, new_job_dir)
self.job_queue.put(job)
if self.num_ready_workers > 0 and self.stage == "idle":
self.start_map_stage()
def start_group_stage(self, message):
"""Fault manager."""
directory = os.path.dirname(message["output_files"][0])
mapped_files = glob.glob(str(directory + "/*"))
mapped_files = sorted(mapped_files)
# mapped_files = message["output_files"]
self.group_intermediate = True
next_worker = 0
ready_workers = []
for worker in self.workers:
if worker["status"] == "ready":
ready_workers.append(worker)
for file_ in mapped_files:
if next_worker == len(ready_workers):
next_worker = 0
ready_workers[next_worker]["sort_list"].append(file_)
next_worker += 1
print("FLAG")
for idx, worker in enumerate(self.workers):
if worker["status"] == "ready":
temp = idx + 1
if temp < 10:
temp = str(0) + str(temp)
gout = os.path.join(
self.current_job["tmp_output_dir"],
"grouper-output/sorted{}".format(str(temp)))
message = {
"message_type": "new_sort_task",
"input_files": worker["sort_list"],
"output_file": gout,
"worker_pid": worker["pid"]
}
utils.client_send_data(worker["port"], message)
worker["status"] = "busy"
self.num_ready_workers -= 1
def end_group_stage(self, message):
"""Fault manager."""
self.group_intermediate = False
# num_workers = self.current_job["num_reducers"]
file_path = os.path.dirname(message["output_file"])
sorted_files = glob.glob(str(file_path + "/*"))
out = []
for input_ in sorted_files:
fin = open(input_, 'r')
out.append(fin)
list_of_fds = []
i = 0
while i < self.current_job["num_reducers"]:
if i < 10:
num = str(0) + str(i + 1)
else:
num = str(i + 1)
f_1 = open(file_path + "/reduce{}".format(num), 'a+')
list_of_fds.append(f_1)
i += 1
num_unique_keys = -1
cur_key = None
for line in heapq.merge(*out):
if cur_key != line.split("\t")[0]:
num_unique_keys += 1
cur_key = line.split("\t")[0]
reduce_file_num = (num_unique_keys
% self.current_job["num_reducers"])
list_of_fds[reduce_file_num].write(line)
for f_1 in list_of_fds:
f_1.close()
def start_reduce_stage(self, message):
"""Fault manager."""
num_tasks = self.current_job["num_reducers"]
directory = os.path.dirname(message["output_file"])
sorted_files = glob.glob(str(directory + "/reduce*"))
self.partion_inputs(num_tasks, sorted_files)
executable = self.current_job["reducer_executable"]
id_job = self.current_job["job_id"]
output_dir = "tmp/job-{}/reducer-output".format(id_job)
self.assign_tasks(num_tasks, executable, output_dir)
def wrapping_up(self):
"""Fault manager."""
id_job = self.current_job["job_id"]
source_dir = "tmp/job-{}/reducer-output".format(id_job)
target_dir = self.current_job["output_directory"]
os.mkdir(target_dir)
file_names = os.listdir(source_dir)
for file_ in file_names:
temp = file_.replace("reduce", "outputfile")
os.rename(os.path.join(source_dir, file_),
os.path.join(source_dir, temp))
file_names = os.listdir(source_dir)
for file_ in file_names:
shutil.move(os.path.join(source_dir, file_), target_dir)
logging.info("Master:%s finished job. Output directory: %s",
self.own_port, target_dir)
if not self.job_queue.empty():
self.start_map_stage()
else:
for worker in self.workers:
if worker["status"] != "dead":
worker["status"] = "ready"
self.num_ready_workers += 1
self.stage = "idle"
def stage_complete(self):
"""Fault manager."""
tasks = self.current_job["tasks"]
# print(tasks.empty())
# for w in self.workers:
# print(w)
if not tasks.empty():
return False
if any(not worker_tasks_empty(d) for d in self.workers):
# print("stage {} incomplete".format(self.stage))
return False
# print("stage {} complete".format(self.stage))
return True
def assign_another_task(self, worker_index):
"""Fault manager."""
job_tasks = self.current_job["tasks"].get_nowait()
self.workers[worker_index]["task_files"] = job_tasks
id_job = self.current_job["job_id"]
new_message = {
"message_type": "new_worker_task",
"input_files": job_tasks,
"executable": self.current_job["mapper_executable"],
"output_directory": "tmp/job-{}/mapper-output".format(id_job),
"worker_pid": self.workers[worker_index]["pid"]
}
if self.stage == "reduce":
new_message["executable"] = self.current_job["reducer_executable"]
new_message["output_directory"] = "tmp"
"/job-{}/reducer-output".format(self.current_job["job_id"])
self.workers[worker_index]["status"] = "busy"
self.num_ready_workers -= 1
utils.client_send_data(self.workers[worker_index]["port"], new_message)
print("sent new task to worker")
def handle_status_message(self, message):
"""Fault manager."""
if message["status"] != "finished":
return
worker_index = -1
for i, dic in enumerate(self.workers):
if dic["pid"] == message["worker_pid"]:
worker_index = i
# print("worker {} finished a task".format(worker_index))
# worker_index = self.workers.index(message["worker_pid"])
self.workers[worker_index]["status"] = "ready"
self.num_ready_workers += 1
self.workers[worker_index]["task_files"] = []
self.workers[worker_index]["sort_list"] = []
# tasks = self.current_job["tasks"]
if self.stage_complete():
if self.stage == "map":
logging.info("Master:%s end map stage", self.own_port)
self.stage = "group"
logging.info("Master:%s begin group stage", self.own_port)
self.start_group_stage(message)
elif self.stage == "group":
if self.group_intermediate:
self.end_group_stage(message)
if not self.group_intermediate:
logging.info("Master:%s end group stage", self.own_port)
self.stage = "reduce"
logging.info("Master:%s begin reduce stage", self.own_port)
self.start_reduce_stage(message)
elif self.stage == "reduce":
logging.info("Master:%s end reduce stage", self.own_port)
self.wrapping_up()
return
if not self.current_job["tasks"].empty():
self.assign_another_task(worker_index)
def handle_message(self, message_dict):
"""Fault manager."""
message_type = message_dict["message_type"]
if message_type == "shutdown":
self.shutdown = True
for worker in self.workers:
if worker["status"] != "dead":
shutdown_worker(worker["port"])
elif message_type == "register":
self.register_worker(message_dict)
elif message_type == "new_master_job":
self.handle_new_job(message_dict)
elif message_type == "status":
self.handle_status_message(message_dict)
def master_listen(self):
"""Fault manager."""
sock = utils.server_sock_create(self.own_port)
while