pandas的apply函数通常用于一些复杂的遍历操作(遍历可迭代对象的同时执行一些自定义函数),它的可定制程度高,而且比itterrows、for等操作效率更高,是我非常喜欢而且常用的一个函数。apply的主要参数和对应说明可以查看官网(里面已经说得很详细)
https://pandas.pydata.org/pandas-docs/stable/reference/api/pandas.DataFrame.apply.html

对DataFrame的每一行执行一些复杂的操作

举一个例子,计算DataFrame中每一条数据中两个人的轨迹相似度,因为和业务相关,里面的子函数不能透露,这里重点呈现apply的用法。

import numpy as np   def calculate_trajectory_similarity(df, trail_dict): 	"""  	:param trail_dict:  	:param df:  	:return:  	""" 	body_threshold = 2 	similarity_threshold = 0.6 	insert_columns = [ 		'trajectory_similarity', 	] 	for c in insert_columns: 		try: 			#  			df.insert(df.shape[1], c, 0) 		except ValueError as v: 			#  			print(str(v)) 	 	def calc_trajectory_similarity(element): 		"""计算XXX""" 		trail01 = trail_dict.get(element['record_id1']) 		trail02 = trail_dict.get(element['record_id2']) 		trail_arr1 = np.array(trail01) 		trail_arr2 = np.array(trail02) 		face_trail01 = trail_arr1[trail_arr1[:, -1] == 'face'] 		body_trail01 = trail_arr1[trail_arr1[:, -1] == 'body'] 		face_trail02 = trail_arr2[trail_arr2[:, -1] == 'face'] 		body_trail02 = trail_arr2[trail_arr2[:, -1] == 'body'] 		sub_trail_list = [ 			(face_trail01, face_trail02), 			(face_trail01, body_trail02), 			(body_trail01, face_trail02), 			(body_trail01, body_trail02) 		] 		#  		avg_body_width = (element['body_width1'] + element['body_width2']) / 2 		tr_similarity_list = [] 		for s in sub_trail_list: 			coordinate_arr1, coordinate_arr2, time_overlap = get_real_time_coordinates(s[0], s[1]) 			if time_overlap > 0: 				sub_similarity = trajectory_similarity(coordinate_arr1, coordinate_arr2, avg_body_width, body_threshold, 				                                       similarity_threshold) 			else: 				sub_similarity = 0 			tr_similarity_list.append((min(len(coordinate_arr1), len(coordinate_arr2)), sub_similarity)) 		if len(tr_similarity_list) > 0: 			weights = [i[0] for i in tr_similarity_list] 			if np.sum(weights) > 0:  #  				tr_similarity = np.sum([w * s for w, s in tr_similarity_list]) / np.sum(weights) 			else: 				tr_similarity = 0 		else: 			tr_similarity = 0 		element['trajectory_similarity'] = tr_similarity 		return element 	 	df = df.apply(calc_trajectory_similarity, axis=1) 	return df 

里面最核心的操作是df = df.apply(calc_trajectory_similarity, axis=1),这行代码通过apply调用了calc_trajectory_similarity这个函数,并按照行遍历DataFrame,利用每一行(Series对象)的一些字段信息,计算出轨迹相似度,并存储到DataFrame中。get_real_time_coordinatestrajectory_similarity分别是统计实时点和计算轨迹相似度的自定义函数,在这里可以不用关注。

对Series的每一个元素执行一些复杂操作

举个例子,现有一些原始的轨迹数据,需要进行预处理,可以针对需要处理的DataFrame字段(Series格式)单独进行操作。

import re   def split_to_int(element): 	"""XXX""" 	if element: 		return list(map(int, re.findall(r"[\d]+", element))) 	else: 		element = [] 		return element   def split_to_list(element): 	"""XXX""" 	if element: 		element = list(re.findall(r"[\d]+", element)) 		element = list(map(convert_time, element)) 		return element 	else: 		element = [] 		return element   def trail_string_processing(df): 	""" 	 	:param df: 	:return: 	""" 	#  	pd.set_option('mode.chained_assignment', None) 	trail_name = [ 		'trail_left_top_x', 		'trail_left_top_y', 		'trail_right_btm_x', 		'trail_right_btm_y', 	] 	for t in trail_name: 		df.loc[:, t] = df[t].apply(split_to_int) 	return df   def time_string_processing(df): 	""" 	XXX 	:param df: 	:return:  	""" 	# XXX 	pd.set_option('mode.chained_assignment', None) 	df.loc[:, 'trail_point_time'] = df['trail_point_time'].apply(split_to_list) 	#  	df.loc[:, 'shot_time'] = df['shot_time'].apply( 		lambda x: x.tz_convert('Asia/Shanghai').tz_localize(None) if x.tz else x) 	return df 

在上面的代码中,每一个apply都是针对series执行的操作,apply里面的函数可以是自定义函数,也可以是lambda匿名函数。

对GroupBy对象执行一些复杂操作

举个例子,现有一个DataFrame需要按照某些字段进行分组,然后对分组后的对象执行一些操作,然后重构为新的DataFrame,这时可以通过apply来实现。

import pandas as pd   def merge_key_person_info(df): 	""" 	XXXX 	:param df:  	:return: 	""" 	 	def group_by_key_person(element): 		element = element.drop_duplicates(subset=['pvid', 'rel_pvid']) 		#  		key_person_code = element['key_person_code'].iloc[0] 		if key_person_code == 'tag_is_family': 			max_members_num = 6 		else: 			max_members_num = 11 		key_person_num = len(element['pvid'].iloc[0].split(',')) 		num_k = max_members_num - key_person_num 		num_k = num_k if num_k > 1 else 1 		element = element.sort_values(by=['relation_score'], ascending=False).iloc[:num_k, :] 		#  		key_person_score = list(set(element['key_person_score'].values)) 		rel_pvid_list = list(element['rel_pvid'].values) 		relation_code_list = list(element['relation_code'].values) 		relation_score_list = list(element['relation_score'].values) 		start_time_list = list(element['relation_info_start_time'].values) 		end_time_list = list(element['relation_info_end_time'].values) 		series_dict = { 			'pvid': element['pvid'].iloc[0], 			'corp_id': element['corp_id'].iloc[0], 			'key_person_code': element['key_person_code'].iloc[0], 			'key_person_score': key_person_score, 			'rel_pvid': rel_pvid_list, 			'relation_code': relation_code_list, 			'relation_score': relation_score_list, 			'relation_info_start_time': start_time_list, 			'relation_info_end_time': end_time_list 		} 		result = pd.Series(series_dict) 		return result 	 	#  	group_by_obj = df.groupby(by=['pvid', 'corp_id', 'key_person_code']) 	group_df = group_by_obj.apply(group_by_key_person).reset_index(drop=True) 	return group_df 

有时候为了提升效率,一些涉及到大量数值计算的apply可以使用numpy的.apply_along_axis替代。

def calculate_speed_and_angle_similarity(parameters_df):     """          :param parameters_df:      :return:      """     try:         #          parameters_df.insert(parameters_df.shape[1], 'angle_similarity', 0)         parameters_df.insert(parameters_df.shape[1], 'speed_similarity', 0)     except ValueError as v:         #         logger = my_logger()         logger.info(str(v))      def calc_angle_speed_similarity(element):         """XXXX"""         angle1 = element[35]         angle2 = element[83]          moving_speed1 = element[43]         moving_speed2 = element[91]          #          angle_difference = abs(angle1 - angle2)         if angle_difference >= 90:  #              angle_similarity = 0         else:             angle_similarity = np.cos(abs(angle1 - angle2) / 180 * np.pi)         element[102] = angle_similarity          #          slower_speed = min(moving_speed1, moving_speed2)         faster_speed = max(moving_speed1, moving_speed2)         speed_similarity = slower_speed / faster_speed         element[103] = speed_similarity         return element      arr = parameters_df.values     new_arr = np.apply_along_axis(calc_angle_speed_similarity, axis=1, arr=arr)     parameters_df = pd.DataFrame(new_arr, columns=parameters_df.columns)     return parameters_df 

按照上述写法,虽然可以在一定程度上提升运行速度,但由于ndarray不支持字符串索引,对字段的操作只能按照序号来进行,很容易出错,代码可读性也比较差,不太推荐在复杂函数中使用,简单的计算用np.apply_along_axis会比较适合。

上面的代码都是一些模块的片段,只是用来展示apply的用法,因此无法跑通,请多包涵。为了信息安全,所有注释和细节代码都删除了。