| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118 |
- import json
- import re
- import os
- def ensure_json_values(bson_str):
- def replace_special_types(match):
- prefix = match.group(1)
- key = match.group(2)
- value = match.group(3).replace('"', "")
- suffix = match.group(4)
- comma_or_newline = match.group(5)
- return f'{prefix}{key}{value}{suffix}{comma_or_newline}'
- patterns = {
- 'normal': re.compile(r'([ \t]*)(\"[^\"]*\"\s*:\s*)([^\",\{\}\[\]\s].*?)([,\n])'),
- 'special_types': re.compile(
- r'([ \t]*)(\"(?:ObjectId|ISODate|NumberInt|NumberLong|NumberDecimal|Binary|Boolean|Timestamp|RegExp|DBRef|JavaScript code|Symbol|MinKey|MaxKey)\()\"([^\"\)]*)\"(\).+?)([,\n])')
- }
- # 使用正则表达式替换非特殊类型的值
- bson_str = re.sub(patterns['normal'],
- lambda match: f'{match.group(1)}{match.group(2)}"{match.group(3)}"{match.group(4)}', bson_str)
- # 使用定义的函数替换特殊类型的值
- bson_str = re.sub(patterns['special_types'], replace_special_types, bson_str)
- return bson_str
- def analyze_json_objects(file_path):
- json_objects = []
- with open(file_path, 'r') as file:
- content = ensure_json_values(file.read())
- json_strings = re.split(r'\n\s*\n', content.strip())
- for js in json_strings:
- if js:
- try:
- json_obj = json.loads(js)
- json_objects.append(json_obj)
- except json.JSONDecodeError:
- print(f'Error decoding JSON from string: {js}')
- id_to_json_map = {}
- for jsonObj in json_objects:
- if isinstance(jsonObj, dict):
- json_id = jsonObj.get('_id', None)
- if json_id is not None:
- if json_id in id_to_json_map:
- id_to_json_map[json_id].append(jsonObj)
- else:
- id_to_json_map[json_id] = [jsonObj]
- return id_to_json_map
- def compare_json_objects(id_to_json_map):
- output_lines = []
- for json_id, json_list in id_to_json_map.items():
- if len(json_list) != 2:
- continue
- first, second = json_list
- first_set = set(first.keys())
- second_set = set(second.keys())
- intersect_keys = first_set & second_set
- unique_first = first_set - second_set
- unique_second = second_set - first_set
- output_lines.append(f"比较的MongoId: {json_id}")
- if unique_first:
- output_lines.append("新表中独有的:")
- for key in unique_first:
- output_lines.append(f" {key}: {first.get(key)}")
- if unique_second:
- output_lines.append("旧表中独有的:")
- for key in unique_second:
- output_lines.append(f" {key}: {second.get(key)}")
- differing_values = []
- for key in intersect_keys:
- if first.get(key) != second.get(key):
- differing_values.append(f" {key}: {first.get(key)} vs {second.get(key)}")
- if differing_values:
- output_lines.append("两表value不同的:")
- output_lines.extend(differing_values)
- output_lines.append("")
- current_path = os.getcwd()
- target_folder = 'tendata-warehouse'
- base_path = get_base_path(current_path, target_folder)
- # 可以指定自己的输出路径
- file_path = os.path.join(base_path, "workspace", "output.txt")
- with open(file_path, 'w') as output_file:
- output_file.write('\n'.join(output_lines))
- print("可在服务器查看{}".format(file_path))
- def get_base_path(current_path, target_folder):
- path_parts = current_path.split(os.sep)
- target_index = path_parts.index(target_folder)
- base_path = os.sep.join(path_parts[:target_index + 1])
- return base_path
- def main():
- current_path = os.getcwd()
- target_folder = 'tendata-warehouse'
- base_path = get_base_path(current_path, target_folder)
- relative_path = input("请输入比较文件的相对路径 Path From Content Root: ")
- file_path = os.path.join(base_path, relative_path)
- id_to_json_map = analyze_json_objects(file_path)
- compare_json_objects(id_to_json_map)
- if __name__ == "__main__":
- main()
|