Post
Algorithm Notes: External Sorting And Merging
外排序和K路归并算法
外排序算法(External Sorting) 外排序算法是指在内存不够的情况下,如何对存储在一个或者多个大文件中的数据进行排序的算法。外排序算法通常是解决一些大数据处理问题的第一个步骤,或者是面试官所会考察的算法基本功。外排序算法是海量数据处理算法中十分重要的一块。 在学习这类大数据算法时,经常要考虑到内存、缓存、准确度等因素,这和我们之前见到的算法都略有差别。
外排序算法分为两个基本步骤:
- 将大文件切分为若干个个小文件,并分别使用内存排好序
- 使用K路归并算法(k-way merge)将若干个排好序的小文件合并到一个大文件中
第一步:文件拆分 根据内存的大小,尽可能多的分批次的将数据 Load 到内存中,并使用系统自带的内存排序函数(或者自己写个快速排序算法),将其排好序,并输出到一个个小文件中。比如一个文件有1T,内存有1G,那么我们就这个大文件中的内容按照 1G 的大小,分批次的导入内存,排序之后输出得到 1024 个 1G 的小文件。
第二步:K路归并算法 K路归并算法使用的是数据结构堆(Heap)来完成的,使用 Java 或者 C++ 的同学可以直接用语言自带的 PriorityQueue(C++中叫priority_queue)来代替。
我们将 K 个文件中的第一个元素加入到堆里,假设数据是从小到大排序的话,那么这个堆是一个最小堆(Min Heap)。每次从堆中选出最小的元素,输出到目标结果文件中,然后如果这个元素来自第 x 个文件,则从第 x 个文件中继续读入一个新的数进来放到堆里,并重复上述操作,直到所有元素都被输出到目标结果文件中。
Follow up: 一个个从文件中读入数据,一个个输出到目标文件中操作很慢,如何优化? 如果我们每个文件只读入1个元素并放入堆里的话,总共只用到了 1024 个元素,这很小,没有充分的利用好内存。另外,单个读入和单个输出的方式也不是磁盘的高效使用方式。因此我们可以为输入和输出都分别加入一个缓冲(Buffer)。假如一个元素有10个字节大小的话,1024 个元素一共 10K,1G的内存可以支持约 100K 组这样的数据,那么我们就为每个文件设置一个 100K 大小的 Buffer, 每次需要从某个文件中读数据,都将这个 Buffer 装满。当然 Buffer 中的数据都用完的时候,再批量的从文件中读入。输出同理,设置一个 Buffer 来避免单个输出带来的效率缓慢。 那下面我们就来熟悉下两路归并和K路归并的算法。
例题 1 6 Merge Two Sorted Arrays
class Solution:
"""
@param A: sorted integer array A
@param B: sorted integer array B
@return: A new sorted integer array
"""
def mergeSortedArray(self, A, B):
# write your code here
i, j = 0, 0
merged = []
while i < len(A) and j < len(B):
if A[i] < B[j]:
merged.append(A[i])
i += 1
else:
merged.append(B[j])
j += 1
while i < len(A):
merged.append(A[i])
i += 1
while j < len(B):
merged.append(B[j])
j += 1
# 或者这样:
# if i < len(A):
# merged += A[i:]
# if j < len(B):
# merged += B[j:]
return mergedFollow Up:LeetCode-88 Merge Sorted Array
把小数组合并到大数组。希望用O(N + M)的最优时间复杂度来完成。那么应该从后往前合并,比大的,放大的。
class Solution:
"""
@param: A: sorted integer array A which has m elements, but size of A is m+n
@param: m: An integer
@param: B: sorted integer array B which has n elements
@param: n: An integer
@return: nothing
"""
def mergeSortedArray(self, A, m, B, n):
# write your code here
pos = m + n - 1
i = m - 1
j = n - 1
while i >= 0 and j >= 0 :
if A[i]>B[j] :
A[pos]=A[i]
pos-=1
i-=1
else :
A[pos]=B[j]
pos-=1
j-=1
while i >= 0 :
A[pos] = A[i]
pos-=1
i-=1
while j >= 0:
A[pos] = B[j]
pos-=1
j-=1例题 2 LeetCode-21 Merge Two Sorted Lists
# Definition for singly-linked list.
# class ListNode:
# def __init__(self, val=0, next=None):
# self.val = val
# self.next = next
class Solution:
def mergeTwoLists(self, l1, l2):
dummy = ListNode(0)
temp = dummy
while l1 != None and l2 != None:
if l1.val < l2.val:
temp.next = l1
l1 = l1.next
else:
temp.next = l2
l2 = l2.next
temp = temp.next
if l1 != None:
temp.next = l1
else:
temp.next = l2
return dummy.next例题 3 LeetCode-23 Merge K Sorted Lists
K 路归并算法。(共三种实现方法)
1.利用priority queue的方法可以在O(NlogK)的时间复杂度完成。[Heap]
2.利用二叉树的形式,两两归并,时间复杂度(ONlogK),利用的是merge two sorted lists。[从下往上]
3.分治法解决,(最应该掌握的算法,K路归并算法)[从上往下]
1.Heap做法,需要overwrite以下ListNode类的比较方程(lt_), lessthan function, 这样就能直接把node放进heap里了。
# Definition for singly-linked list.
# class ListNode:
# def __init__(self, val=0, next=None):
# self.val = val
# self.next = next
### overwrite the comparator function first, so that we could directly put ListNode into a heapq
import heapq
ListNode.__lt__ = lambda x, y: (x.val < y.val)
class Solution:
def mergeKLists(self, lists):
if not lists:
return None
dummy = ListNode(None)
tail = dummy
heap = []
for head in lists:
if head:
heapq.heappush(heap, head)
while heap:
head = heapq.heappop(heap)
tail.next = head
tail = head
if head.next:
heapq.heappush(heap, head.next)
return dummy.next
2.从下而上, 两两归并。两两归并,就是用merge two sorted lists就可以了。这题的思路和mergesort算法非常一致。
# Definition for singly-linked list.
# class ListNode:
# def __init__(self, val=0, next=None):
# self.val = val
# self.next = next
class Solution:
def mergeKLists(self, lists):
### 自顶部向下走:
if not lists:
return None
return self.merge_range_lists(lists, 0, len(lists) -1)
def merge_range_lists(self, lists, start, end):
if start == end:
return lists[start]
mid = (start + end) // 2
left = self.merge_range_lists(lists, start, mid)
right = self.merge_range_lists(lists, mid + 1, end)
return self.merge_two_lists(left, right)
def merge_two_lists(self, head1, head2):
tail = dummy = ListNode(None)
while head1 and head2:
if head1.val < head2.val:
tail.next = head1
head1 = head1.next
else:
tail.next = head2
head2 = head2.next
tail = tail.next
if head1:
tail.next = head1
if head2:
tail.next = head2
return dummy.next3.经典算法,k路并归,从上而下。 也是两两并归。
# Definition for singly-linked list.
# class ListNode:
# def __init__(self, val=0, next=None):
# self.val = val
# self.next = next
class Solution:
def mergeKLists(self, lists):
if not lists:
return None
while len(lists) > 1:
next_lists = []
for i in range(0, len(lists), 2):
if i + 1 < len(lists):
new_list = self.merge_two_lists(lists[i], lists[i + 1])
else:
new_list = lists[i]
next_lists.append(new_list)
lists = next_lists
return lists[0]
def merge_two_lists(self, head1, head2):
tail = dummy = ListNode(None)
while head1 and head2:
if head1.val < head2.val:
tail.next = head1
head1 = head1.next
else:
tail.next = head2
head2 = head2.next
tail = tail.next
if head1:
tail.next = head1
if head2:
tail.next = head2
return dummy.next例题 4 839 Merge Two Sorted Interval Lists
算法:贪心+双指针
算法思路
用两个指针index1和index2分别指向list1和list2的头部,然后比较两个区间的左端点,挑出左端点较小的,将其与答案中最后一个区间进行比较,如果可以合并则合并,否则就将那个区间加入到答案最后。直到所有区间都被合并或者加入到答案中为止
代码思路
先进行特判,若其中一个list为空则直接返回另一个list
将两个指针index1和index2分别指向list1和list2的头部,即等于0
比较指针指向的两个区间的左端点,取出较小的一个左端点的区间,将其赋值给变量now,并将该指针右移一位
将now的左端点与答案list中末尾的右端点比较,若小于末尾右端点则将其合并,即last.end = max(last.end, now.end),否则将now 作为一个独立的区间加入到答案list中
重复3、4两个步骤,直到其中一个指针指向末尾
若有指针还未指向末尾,将指针不断右移并将其指向的区间加入或合并至答案中(判断条件与方法与第四步相同)
复杂度分析
N表示list1的长度,M表示list2的长度
空间复杂度:O(N+M)
时间复杂度:O(N+M)
"""
Definition of Interval.
class Interval(object):
def __init__(self, start, end):
self.start = start
self.end = end
"""
class Solution:
"""
@param list1: one of the given list
@param list2: another list
@return: the new sorted list of interval
"""
def mergeTwoInterval(self, list1, list2):
i, j = 0, 0
intervals = []
while i < len(list1) and j < len(list2):
if list1[i].start < list2[j].start:
self.push_back(intervals, list1[i])
i += 1
else:
self.push_back(intervals, list2[j])
j += 1
while i < len(list1):
self.push_back(intervals, list1[i])
i += 1
while j < len(list2):
self.push_back(intervals, list2[j])
j += 1
return intervals
def push_back(self, intervals, interval):
if not intervals:
intervals.append(interval)
return
last_interval = intervals[-1]
if last_interval.end < interval.start:
intervals.append(interval)
return
intervals[-1].end = max(intervals[-1].end, interval.end)
补充: LeetCode-56 Merge Intervals
此题是lists不是tuple,略有不同,但思路解法都是一致的。
class Solution:
def merge(self, intervals):
intervals.sort()
res = []
for interval in intervals:
self.push_back(interval, res)
return res
def push_back(self, interval, res):
if not res:
res.append(interval)
return
last_interval = res[-1]
end_boundary = last_interval[-1]
if end_boundary < interval[0]:
res.append(interval)
return
res[-1][-1] = max(res[-1][-1], interval[-1])
例题 5 577 Merge K Sorted Interval Lists
1.heap 做法:
"""
Definition of Interval.
class Interval(object):
def __init__(self, start, end):
self.start = start
self.end = end
"""
import heapq
class Solution:
"""
@param intervals: the given k sorted interval lists
@return: the new sorted interval list
"""
def mergeKSortedIntervalLists(self, intervals):
# write your code here
result = []
heap = []
for index, array in enumerate(intervals):
if len(array) == 0:
continue
heapq.heappush(heap, (array[0].start, array[0].end, index, 0))
while len(heap):
start, end, x, y = heap[0]
heapq.heappop(heap)
self.append_and_merge(result, Interval(start, end))
if y + 1 < len(intervals[x]):
heapq.heappush(heap, (intervals[x][y + 1].start, intervals[x][y + 1].end, x, y + 1))
return result
def append_and_merge(self, intervals, interval):
if not intervals:
intervals.append(interval)
return
last_interval = intervals[-1]
if last_interval.end < interval.start:
intervals.append(interval)
return
last_interval.end = max(last_interval.end, interval.end)2.LeetCode Merge Interval (Sorting 做法):
"""
Definition of Interval.
class Interval(object):
def __init__(self, start, end):
self.start = start
self.end = end
"""
class Solution:
"""
@param intervals: the given k sorted interval lists
@return: the new sorted interval list
"""
def mergeKSortedIntervalLists(self, intervals):
# write your code here
arr = []
for i in intervals:
for j in i:
arr.append(j)
arr = sorted(arr, key=lambda o: o.start)
ans = []
if (len(arr) == 0) :
return ans
ans.append(arr[0])
for i in range(1, len(arr)):
if (ans[len(ans) - 1].end >= arr[i].start):
ans[len(ans) - 1].end = max(ans[len(ans) - 1].end, arr[i].end)
else :
ans.append(arr[i])
return ans
例题 6 LeetCode-349 Intersection Of Two Arrays
python做这道题相对来说比较方便,调用将array转换成set后再调用set类的intersection()函数后把结果转换成list就好。
class Solution:
def intersection(self, nums1, nums2):
return list(set(nums1).intersection(set(nums2)))
这么写虽然简单,但是他的时间复杂度为O(N),空间复杂度(额外)也是O(N)。
我们还可以利用sort + binary search 将时间复杂度变成O(NlogN),空间复杂度变成O(1)。
当然也可以用merge two sorted array的方法。 时间复杂度为O(nlogn + mlogm + n + m) if n < m. 空间复杂度 O(1)。
双指针
解题思路
首先,将nums1和nums2排序。
定义双指针i和j,分别指向两个数组。从前向后遍历寻找交集元素
当nums1[i] < nums2[j],i后移1位
当nums1[i] > nums2[j],j后移1位
当nums1[i] == nums2[j],把nums1[i]加入集合intersect,i后移1位,j后移1位
将集合intersect的元素加入res中
class Solution:
"""
@param nums1: an integer array
@param nums2: an integer array
@return: an integer array
"""
def intersection(self, nums1, nums2):
res = []
# 排序
nums1.sort()
nums2.sort()
# 双指针遍历
i, j = 0, 0
intersect = set()
while(i < len(nums1) and j < len(nums2)):
if nums1[i] < nums2[j]:
i += 1
elif nums1[i] > nums2[j]:
j += 1
else:
intersect.add(nums1[i])
i += 1
j += 1
# 把intersect的元素加入到res中
for num in intersect:
res.append(num)
return res
例题 7 654 Sparse Matrix Multiplication
将此题目转换成intersection类型题目来思考。
class Solution:
"""
@param A: a sparse matrix
@param B: a sparse matrix
@return: the result of A * B
"""
def multiply(self, A, B):
# write your code here
row_vectors = self.convert_to_row_vectors(A)
col_vectors = self.convert_to_col_vectors(B)
matrix = []
for row_vector in row_vectors:
row = []
for col_vector in col_vectors:
row.append(self.multi_vector(row_vector, col_vector))
matrix.append(row)
return matrix
def convert_to_row_vectors(self, matrix):
vectors = []
for row in matrix:
vector = []
for index, col in enumerate(row):
if col != 0:
vector.append((index, col))
vectors.append(vector)
return vectors
def convert_to_col_vectors(self, matrix):
n, m = len(matrix), len(matrix[0])
vectors = []
for j in range(m):
vector = []
for i in range(n):
if matrix[i][j] != 0:
vector.append((i, matrix[i][j]))
vectors.append(vector)
return vectors
def multi_vector(self, v1, v2):
i, j = 0, 0
result = 0
while i < len(v1) and j < len(v2):
if v1[i][0] < v2[j][0]:
i += 1
elif v1[i][0] > v2[j][0]:
j += 1
else:
result += v1[i][1] * v2[j][1]
i += 1
j += 1
return result