import requests from rocketry import Rocketry from rocketry.conds import every # GitLab API token and repository details GITLAB_API_TOKEN = 'glpat-8i4z8VbfL9a76YxFqHHC' GITLAB_API_URL = 'https://gitlab.com/api/v4/projects/codebreaker-challenge%2F2024%2Foutreach-2024/issues?state=opened' GITLAB_PROJECT_URL = 'https://gitlab.com/codebreaker-challenge/2024/outreach-2024' # Discord webhook URL DISCORD_WEBHOOK_URL = 'https://discord.com/api/webhooks/1285276886871834655/0pRndW9BZjUUmLlcELIT1fLkOXKfe-FEX3WDpoPZT7MMvKvK9V_dFo7FkDfB2W3CRghV' # Headers for GitLab API request headers = { 'Private-Token': GITLAB_API_TOKEN } # Set to keep track of reported issues reported_issues = set() # Function to send a message to Discord def send_discord_notification(title, email, subject, message, issue_url): data = { "content": f"**New Issue**\n**Email:** {email}\n**Subject:** {subject}\n**Message:** {message}\n**Link:** {issue_url}" } response = requests.post(DISCORD_WEBHOOK_URL, json=data) if response.status_code == 204: print("Notification sent successfully.") else: print(f"Failed to send notification. Status code: {response.status_code}") # Function to check for new issues def check_for_new_issues(): response = requests.get(GITLAB_API_URL, headers=headers) if response.status_code == 200: issues = response.json() for issue in issues: if issue['id'] not in reported_issues: title = issue['title'] email, subject = title.split(' - ', 1) message = issue['description'][:50] + '...' if len(issue['description']) > 50 else issue['description'] issue_url = f"{GITLAB_PROJECT_URL}/-/issues/{issue['iid']}" send_discord_notification(title, email, subject, message, issue_url) reported_issues.add(issue['id']) else: print(f"Failed to fetch issues. Status code: {response.status_code}") # Create a Rocketry app app = Rocketry() # Schedule the task to run every 30 seconds @app.task(every('20 seconds')) def scheduled_task(): check_for_new_issues() # Run the Rocketry app if __name__ == "__main__": app.run()