Mapreduce API is great. We’ve got a tool now that can process tasks taking more than 30 seconds. Yeaaaahhh! This is a huge improvement. I wish we have had this tools months ago. All the examples in the documentation use the webapp framework, there aren’t many examples using the Django helper in the internet. This post is about that.
mapreduce.yaml:
mapreduce:
- name: Delete SearchableTowns
mapper:
input_reader: mapreduce.input_readers.DatastoreInputReader
handler: main_map_reduce.delete_searchable_towns
params:
- name: entity_kind
default: mapreduce_models.SearchableTown
- name: Create SearchableTown from Town
mapper:
input_reader: mapreduce.input_readers.DatastoreInputReader
handler: main_map_reduce.town_to_searchable
params:
- name: entity_kind
default: mapreduce_models.Town
- name: Create Town and SearchableTown from csv for USA
mapper:
input_reader: mapreduce.input_readers.BlobstoreLineInputReader
handler: main_map_reduce.csv_to_towns
params:
- name: blob_keys
default: AMIfv97g-x4G9-KM24YXQi6dSyBddAb97p0n98NgJlCL68jJA9jcvwETojEcF7MGGlZsDLEFVcJeeLHGgwxo9Nlay9GR33LniA06Obw3C781Te9yAn9Dk1EkwxjrFqHEBo4-WbZ7GUS9nKa3NOpDGdbxBBkD2sTYUg
The file contain 3 tasks. 2 of them are intended to create or modify datastore entities. The other one is going to read a big csv from the blogstore, creating a datastore entity for every line in the file. This is the Python version of this blog post (which uses Java).
Now, main_map_reduce is a python file that I keep in the same location than mapreduce.yaml. Just a regular python file. The imports in that file might cause exceptions, specially if they try to load Django stuff. In order to avoid problems we had to copy our models.py into mapreduce_models.py removing almost all the imports. As mapreduce_models.py is placed at same level than mapreduce.yaml, we had to hack also the file appengine_django/models.py, replacing this line:
self.app_label = model_module.name.split('.')[-2]
With this block:
self.app_label = 'my_app_name'
try:
self.app_label = model_module.__name__.split('.')[-2]
except IndexError:
pass
main_map_reduce.py:
def delete_searchable_towns(town_entity):
yield op.db.Delete(town_entity)
def town_to_searchable(town_entity):
searchable = models.SearchableTown()
searchable.code = town_entity.code
searchable.lower_name = town_entity.name.lower()
yield op.db.Put(searchable)
def csv_to_towns(input_tuple):
line = input_tuple[1]
offset = input_tuple[0]
# process the line ...
yield op.db.Put(town_entity)
In the first two methods, the mapreducer passes in an entity. In the last one, it passes a tuple, where its second item is the line read from the blog, which is a big csv file.
This way, we can now upload a huge csv and then create entities from it. This tasks was really painful before, as we had to make a ton of dirty hacks in order to avoid the 30 seconds restriction.