pandas的apply函数通常用于一些复杂的遍历操作(遍历可迭代对象的同时执行一些自定义函数),它的可定制程度高,而且比itterrows、for等操作效率更高,是我非常喜欢而且常用的一个函数。apply的主要参数和对应说明可以查看官网(里面已经说得很详细)
https://pandas.pydata.org/pandas-docs/stable/reference/api/pandas.DataFrame.apply.html
对DataFrame的每一行执行一些复杂的操作
举一个例子,计算DataFrame中每一条数据中两个人的轨迹相似度,因为和业务相关,里面的子函数不能透露,这里重点呈现apply的用法。
import numpy as np def calculate_trajectory_similarity(df, trail_dict): """ :param trail_dict: :param df: :return: """ body_threshold = 2 similarity_threshold = 0.6 insert_columns = [ 'trajectory_similarity', ] for c in insert_columns: try: # df.insert(df.shape[1], c, 0) except ValueError as v: # print(str(v)) def calc_trajectory_similarity(element): """计算XXX""" trail01 = trail_dict.get(element['record_id1']) trail02 = trail_dict.get(element['record_id2']) trail_arr1 = np.array(trail01) trail_arr2 = np.array(trail02) face_trail01 = trail_arr1[trail_arr1[:, -1] == 'face'] body_trail01 = trail_arr1[trail_arr1[:, -1] == 'body'] face_trail02 = trail_arr2[trail_arr2[:, -1] == 'face'] body_trail02 = trail_arr2[trail_arr2[:, -1] == 'body'] sub_trail_list = [ (face_trail01, face_trail02), (face_trail01, body_trail02), (body_trail01, face_trail02), (body_trail01, body_trail02) ] # avg_body_width = (element['body_width1'] + element['body_width2']) / 2 tr_similarity_list = [] for s in sub_trail_list: coordinate_arr1, coordinate_arr2, time_overlap = get_real_time_coordinates(s[0], s[1]) if time_overlap > 0: sub_similarity = trajectory_similarity(coordinate_arr1, coordinate_arr2, avg_body_width, body_threshold, similarity_threshold) else: sub_similarity = 0 tr_similarity_list.append((min(len(coordinate_arr1), len(coordinate_arr2)), sub_similarity)) if len(tr_similarity_list) > 0: weights = [i[0] for i in tr_similarity_list] if np.sum(weights) > 0: # tr_similarity = np.sum([w * s for w, s in tr_similarity_list]) / np.sum(weights) else: tr_similarity = 0 else: tr_similarity = 0 element['trajectory_similarity'] = tr_similarity return element df = df.apply(calc_trajectory_similarity, axis=1) return df
里面最核心的操作是df = df.apply(calc_trajectory_similarity, axis=1)
,这行代码通过apply调用了calc_trajectory_similarity这个函数,并按照行遍历DataFrame,利用每一行(Series对象)的一些字段信息,计算出轨迹相似度,并存储到DataFrame中。get_real_time_coordinates
和trajectory_similarity
分别是统计实时点和计算轨迹相似度的自定义函数,在这里可以不用关注。
对Series的每一个元素执行一些复杂操作
举个例子,现有一些原始的轨迹数据,需要进行预处理,可以针对需要处理的DataFrame字段(Series格式)单独进行操作。
import re def split_to_int(element): """XXX""" if element: return list(map(int, re.findall(r"[\d]+", element))) else: element = [] return element def split_to_list(element): """XXX""" if element: element = list(re.findall(r"[\d]+", element)) element = list(map(convert_time, element)) return element else: element = [] return element def trail_string_processing(df): """ :param df: :return: """ # pd.set_option('mode.chained_assignment', None) trail_name = [ 'trail_left_top_x', 'trail_left_top_y', 'trail_right_btm_x', 'trail_right_btm_y', ] for t in trail_name: df.loc[:, t] = df[t].apply(split_to_int) return df def time_string_processing(df): """ XXX :param df: :return: """ # XXX pd.set_option('mode.chained_assignment', None) df.loc[:, 'trail_point_time'] = df['trail_point_time'].apply(split_to_list) # df.loc[:, 'shot_time'] = df['shot_time'].apply( lambda x: x.tz_convert('Asia/Shanghai').tz_localize(None) if x.tz else x) return df
在上面的代码中,每一个apply都是针对series执行的操作,apply里面的函数可以是自定义函数,也可以是lambda匿名函数。
对GroupBy对象执行一些复杂操作
举个例子,现有一个DataFrame需要按照某些字段进行分组,然后对分组后的对象执行一些操作,然后重构为新的DataFrame,这时可以通过apply来实现。
import pandas as pd def merge_key_person_info(df): """ XXXX :param df: :return: """ def group_by_key_person(element): element = element.drop_duplicates(subset=['pvid', 'rel_pvid']) # key_person_code = element['key_person_code'].iloc[0] if key_person_code == 'tag_is_family': max_members_num = 6 else: max_members_num = 11 key_person_num = len(element['pvid'].iloc[0].split(',')) num_k = max_members_num - key_person_num num_k = num_k if num_k > 1 else 1 element = element.sort_values(by=['relation_score'], ascending=False).iloc[:num_k, :] # key_person_score = list(set(element['key_person_score'].values)) rel_pvid_list = list(element['rel_pvid'].values) relation_code_list = list(element['relation_code'].values) relation_score_list = list(element['relation_score'].values) start_time_list = list(element['relation_info_start_time'].values) end_time_list = list(element['relation_info_end_time'].values) series_dict = { 'pvid': element['pvid'].iloc[0], 'corp_id': element['corp_id'].iloc[0], 'key_person_code': element['key_person_code'].iloc[0], 'key_person_score': key_person_score, 'rel_pvid': rel_pvid_list, 'relation_code': relation_code_list, 'relation_score': relation_score_list, 'relation_info_start_time': start_time_list, 'relation_info_end_time': end_time_list } result = pd.Series(series_dict) return result # group_by_obj = df.groupby(by=['pvid', 'corp_id', 'key_person_code']) group_df = group_by_obj.apply(group_by_key_person).reset_index(drop=True) return group_df
有时候为了提升效率,一些涉及到大量数值计算的apply可以使用numpy的.apply_along_axis替代。
def calculate_speed_and_angle_similarity(parameters_df): """ :param parameters_df: :return: """ try: # parameters_df.insert(parameters_df.shape[1], 'angle_similarity', 0) parameters_df.insert(parameters_df.shape[1], 'speed_similarity', 0) except ValueError as v: # logger = my_logger() logger.info(str(v)) def calc_angle_speed_similarity(element): """XXXX""" angle1 = element[35] angle2 = element[83] moving_speed1 = element[43] moving_speed2 = element[91] # angle_difference = abs(angle1 - angle2) if angle_difference >= 90: # angle_similarity = 0 else: angle_similarity = np.cos(abs(angle1 - angle2) / 180 * np.pi) element[102] = angle_similarity # slower_speed = min(moving_speed1, moving_speed2) faster_speed = max(moving_speed1, moving_speed2) speed_similarity = slower_speed / faster_speed element[103] = speed_similarity return element arr = parameters_df.values new_arr = np.apply_along_axis(calc_angle_speed_similarity, axis=1, arr=arr) parameters_df = pd.DataFrame(new_arr, columns=parameters_df.columns) return parameters_df
按照上述写法,虽然可以在一定程度上提升运行速度,但由于ndarray不支持字符串索引,对字段的操作只能按照序号来进行,很容易出错,代码可读性也比较差,不太推荐在复杂函数中使用,简单的计算用np.apply_along_axis会比较适合。
上面的代码都是一些模块的片段,只是用来展示apply的用法,因此无法跑通,请多包涵。为了信息安全,所有注释和细节代码都删除了。
热门文章
- Java中线程状态+线程安全问题+synchronized的用法详解_java_
- 3月30日 | SingBox每天更新19.4M/S免费节点链接地址分享
- 动物疫苗接种证书怎么查询(动物疫苗查询真伪)
- 3月21日 | SingBox每天更新19.4M/S免费节点链接地址分享
- 妙三多猫疫苗打完有什么不良反应(妙三多猫疫苗打两针还是三针)
- SSM三大框架工作流程与原理详解
- 瑞鹏宠物医院有限公司怎么样(瑞鹏宠物医院有限公司怎么样啊)
- 被猫抓出血怎么样判断有没有事(被猫抓出血怎么样判断有没有事儿)
- 3月23日 | SingBox每天更新22.5M/S免费节点链接地址分享
- 宠物粮食排名榜最新(宠物粮食排名榜最新)